How can I read one line at a time from a trio ReceiveStream?
I ended up writing this. Not properly tested (bugfixes welcome), but it seems to work:
class LineReader:
def __init__(self, stream):
self.stream = stream
self._line_generator = self.generate_lines()
@staticmethod
def generate_lines():
buf = bytes()
while True:
newline_idx = buf.find(b'\n')
if newline_idx < 0:
# no b'\n' found in buf
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
buf = buf[newline_idx+1:]
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
Then I can wrap the ReceiveStream
with a LineReader
and use its readline
method. Adding __aiter__()
and __anext()__
would then be trivial, but I don't need it in my case (I'm porting something to trio that doesn't use async for
anyway).
The other flaw with this is that it assumes UTF-8 or a similar encoding where b'\n'
newlines exist in the encoded bytes object unmodified.
It'd be nice to rely on library functions to handle this though; other answers appreciated.
You're right, there's no highlevel support for this included in Trio currently. There should be something, though I'm not 100% sure what it should look like. I opened an issue to discuss it.
In the mean time, your implementation looks reasonable.
If you want to make it even more robust, you might (1) use a bytearray
instead of bytes
for your buffer, to make appending and deleting amortized O(n) instead of O(n^2), (2) put a limit on the maximum line length, so evil peers can't force you to waste infinite memory buffering infinitely long lines, (3) resume each call to find
at the place where the last one left off instead of restarting from the beginning each time, again to avoid O(n^2) behavior. None of this is super important if you're only dealing with reasonable line-lengths and well-behaved peers, but it doesn't hurt, either.
Here's a tweaked version of your code that tries to incorporate those three ideas:
class LineReader:
def __init__(self, stream, max_line_length=16384):
self.stream = stream
self._line_generator = self.generate_lines(max_line_length)
@staticmethod
def generate_lines(max_line_length):
buf = bytearray()
find_start = 0
while True:
newline_idx = buf.find(b'\n', find_start)
if newline_idx < 0:
# no b'\n' found in buf
if len(buf) > max_line_length:
raise ValueError("line too long")
# next time, start the search where this one left off
find_start = len(buf)
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del buf[:newline_idx+1]
# next time, start the search from the beginning
find_start = 0
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
(Feel free to use under whatever license you like.)