Executing a collection of futures sequentially
Combine iter_ok
and Stream::for_each
:
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
iter_ok
produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each
then returns a Future
to be run for each item - here simply the items that were passed in.
for_each
then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return ()
on success.
for_each
itself returns a Future
that will either fail (like described above) or return ()
on completion.
test tests::bench_variant_buffered ... bench: 22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ... bench: 8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench: 4,070 ns/iter (+/- 531)
Stream
has a function buffered
which allows you to limit how many futures are polled concurrently.
If you have a collection of futures, you can create a stream and use buffered
like so:
let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);
when_result_ready
will now be a Stream
implementation which only polls one future at a time and moves to the next once each future completes.
Update
Based on comments and profiling it appears buffered
has a large overhead so another solution is to convert each Future
to a Stream
and flatten
them:
iter_ok(tasks).map(|f|f.into_stream()).flatten()
flatten
states that "each individual stream will get exhausted before moving on to the next." meaning no Future
will be polled before the previous one is completed. In my local profiling this seems to be ~80% faster than the buffered
approach.
Both of my answers above result in a Stream
of results where each source Future
is polled sequentially and the results returned. What the asker actually asked for was just a single Future
at the end and not the results of each source Future
, if this is the case, the answer from Stefan may be more useful and prove to have better performance.
As mentioned in the comments, your types are too concrete.
You can envision the implementation of fold
as doing something like this:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0);
combined_task = combined_task.and_then(|_| task1);
combined_task = combined_task.and_then(|_| task2);
The variable combined_task
needs to be updated in place with a new value of the same type. Since we start with ok(())
, that's the type each step needs to return. However, the return type of and_then
is different; it's an AndThen
. In fact, AndThen
is a generic type containing the closure and the underlying future, so each step will produce a distinct type with potentially a different size:
FutureResult<()>
AndThen<FutureResult<()>, closure0>
AndThen<AndThen<FutureResult<()>, closure0>, closure1>
AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>
Instead, you can create a unified type by producing a boxed trait object at each step:
let (task0, task1, task2) = (ok(()), ok(()), ok(()));
let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0));
combined_task = Box::new(combined_task.and_then(|_| task1));
combined_task = Box::new(combined_task.and_then(|_| task2));
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Box<Future<Item = (), Error = ()>>
Converting back to the fold
syntax:
let combined_task: Box<Future<Item = (), Error = ()>> =
tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
Box::new(acc.and_then(|_| task))
});
See also:
- Creating Diesel.rs queries with a dynamic number of .and()'s