How to iterate over an asynchronous iterator with a timeout?

AsyncTimedIterable could be the implementation of timeout() in your code:

class _AsyncTimedIterator:

    __slots__ = ('_iterator', '_timeout', '_sentinel')

    def __init__(self, iterable, timeout, sentinel):
        self._iterator = iterable.__aiter__()
        self._timeout = timeout
        self._sentinel = sentinel

    async def __anext__(self):
        try:
            return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
        except asyncio.TimeoutError:
            return self._sentinel


class AsyncTimedIterable:

    __slots__ = ('_factory', )

    def __init__(self, iterable, timeout=None, sentinel=None):
        self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)

    def __aiter__(self):
        return self._factory()

(original answer)

Or use this class to replace your timeout() function:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=None, sentinel=None):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    return await asyncio.wait_for(self._iterator.__anext__(),
                                                  timeout)
                except asyncio.TimeoutError:
                    return sentinel

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

I needed to do something like this to create a websocket(also an async iterator) which times out if it doesn't get a message after a certain duration. I settled on the following:

socket_iter = socket.__aiter__()
try:
    while True:
        message = await asyncio.wait_for(
            socket_iter.__anext__(),
            timeout=10
        )
except asyncio.futures.TimeoutError:
    # streaming is completed
    pass

A simple approach is to use an asyncio.Queue, and separate the code into two coroutines:

queue = asyncio.Queue()
async for item in something():
    await queue.put(item)

In another coroutine:

while True:
    try:
        item = await asyncio.wait_for(queue.get(), 60)
    except asyncio.TimeoutError:
        pass
    else:
        if item is None:
            break  # use None or whatever suits you to gracefully exit
        await do_something_useful(item)
    refresh()

Please note, it will make the queue grow if the handler do_something_useful() is slower than something() generates items. You may set a maxsize on the queue to limit the buffer size.