From concurrent.futures to asyncio
In the asyncio model, execution is scheduled and coordinated by an event loop. To cancel execution of a currently suspended task, you essentially simply have to not resume it. While this works a little different in practice, it should be obvious that this makes cancelling a suspended task simple in theory.
Individual timeouts are certainly possible the same way: whenever you suspend a coroutine to wait for a result, you want to supply a timeout value. The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.
Some concrete samples:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
...
concurrent.futures._base.CancelledError
In practice, this might be implemented using something like this:
class Foo:
task = None
async def sleeper(self):
self.task = asyncio.sleep(60)
try:
await self.task
except concurrent.futures.CancelledError:
raise NotImplementedError
While this method is asleep, somebody else can call foo.task.cancel()
to wake up the coroutine and let it handle the cancellation. Alternatively whoever calls sleeper()
can cancel it directly without giving it a chance to clean up.
Setting timeouts is similarly easy:
>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
...
concurrent.futures._base.TimeoutError
Particularly in the context of HTTP request timeouts, see aiohttp:
async def fetch_page(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.read()
with aiohttp.ClientSession(loop=loop) as session:
content = loop.run_until_complete(fetch_page(session, 'http://python.org'))
Obviously each call to fetch_page
can decide on its own aiohttp.Timeout
value, and each individual instance will throw its own exception when that timeout is reached.
You can raise immediately in its exception (with asyncio.CancelledError
).
I use this method for the catch to overcome it:
import asyncio
async def worker():
try:
# await for some coroutine process
except asyncio.CancelledError:
# Do stuff
raise asyncio.CancelledError()
except Exception as exc:
# Do stuff
print(exc)
finally:
await asyncio.sleep(2)