Can concurrent.futures.Future be converted to asyncio.Future?
There is a function called wrap_future
in asyncio.
Wrap a concurrent.futures.Future object in a asyncio.Future object.
See https://docs.python.org/3/library/asyncio-future.html#asyncio.wrap_future
My question is if i can transfer
concurrent.future.Future
toasyncio.Future
(or the opposite)?
If by "transfer" you mean convert one to the other, yes, it's possible, although bridging the impedance mismatch can take some work.
To convert a concurrent.futures.Future
into an asyncio.Future
, you can call asyncio.wrap_future
. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor
is implemented.
There is no public functionality to directly convert an asyncio future to concurrent.futures
future, but there is the asyncio.run_coroutine_threadsafe
function, which takes a coroutine, submits it to an event loop, and returns a concurrent future which completes when the asyncio future does. This can be used to effectively convert any asyncio-awaitable future to concurrent future, like this:
def to_concurrent(fut, loop):
async def wait():
await fut
return asyncio.run_coroutine_threadsafe(wait(), loop)
The returned future will behave like you'd expect from a concurrent future, e.g. its result()
method will block, etc. One thing you might want to be careful of is that callbacks added to the concurrent future with add_done_callback
run in the thread that marked the future completed, which in this case is the event loop thread. This means that if you add some done callbacks, you need to be careful not to invoke blocking calls in their implementation lest you block the event loop.
Note that calling run_coroutine_threadsafe
requires the event loop to actually run in some other thread. (For example, you can start a background thread and have it execute loop.run_forever
.)
For the "concurrent future to asyncio future" part, here is an utility I use.
from typing import List, Any
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
class AsyncThreadPool(ThreadPoolExecutor):
_futures: List[asyncio.Future]
_loop: asyncio.AbstractEventLoop
def __init__(self, max_workers=None):
super().__init__(max_workers)
self._futures = []
def queue(self, fn):
self._loop = asyncio.get_event_loop()
fut = self._loop.create_future()
self._futures.append(fut)
self.submit(self._entry, fn, fut)
def queueAsync(self, coroutine):
def newLoop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coroutine)
self.queue(newLoop)
def _entry(self, fn, fut: asyncio.Future):
try:
result = fn()
self._loop.call_soon_threadsafe(fut.set_result, result)
except Exception as e:
self._loop.call_soon_threadsafe(fut.set_exception, e)
async def gather(self) -> List[Any]:
return await asyncio.gather(*self._futures)
You can use it like that:
with AsyncThreadPool() as pool:
# Queue some sync function (will be executed on another thread)
pool.queue(someHeavySyncFunction)
# Queue a coroutine that will be executed on a new event loop running on another thread
pool.queue(otherAsyncFunction())
# Gather results (non blocking for your current loop)
res: List[Any] = await pool.gather()