From concurrent.futures to asyncio

In the asyncio model, execution is scheduled and coordinated by an event loop. To cancel execution of a currently suspended task, you essentially simply have to not resume it. While this works a little different in practice, it should be obvious that this makes cancelling a suspended task simple in theory.

Individual timeouts are certainly possible the same way: whenever you suspend a coroutine to wait for a result, you want to supply a timeout value. The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.

Some concrete samples:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
   ...
concurrent.futures._base.CancelledError

In practice, this might be implemented using something like this:

class Foo:
    task = None

    async def sleeper(self):
        self.task = asyncio.sleep(60)
        try:
            await self.task
        except concurrent.futures.CancelledError:
            raise NotImplementedError

While this method is asleep, somebody else can call foo.task.cancel() to wake up the coroutine and let it handle the cancellation. Alternatively whoever calls sleeper() can cancel it directly without giving it a chance to clean up.

Setting timeouts is similarly easy:

>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
   ...
concurrent.futures._base.TimeoutError

Particularly in the context of HTTP request timeouts, see aiohttp:

async def fetch_page(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            assert response.status == 200
            return await response.read()

with aiohttp.ClientSession(loop=loop) as session:
    content = loop.run_until_complete(fetch_page(session, 'http://python.org'))

Obviously each call to fetch_page can decide on its own aiohttp.Timeout value, and each individual instance will throw its own exception when that timeout is reached.


You can raise immediately in its exception (with asyncio.CancelledError).

I use this method for the catch to overcome it:

import asyncio

async def worker():
    try:
        # await for some coroutine process
    except asyncio.CancelledError:
        # Do stuff
        raise asyncio.CancelledError()
    except Exception as exc:
        # Do stuff
        print(exc)
    finally:
        await asyncio.sleep(2)