Combine framework: how to process each element of array asynchronously before proceeding
With your latest edit and this comment below:
I literally am asking is there a Combine equivalent of "don't proceed to the next step until this step, involving multiple asynchronous steps, has finished"
I think this pattern can be achieved with .flatMap
to an array publisher (Publishers.Sequence), which emits one-by-one and completes, followed by whatever per-element async processing is needed, and finalized with a .collect
, which waits for all elements to complete before proceeding
So, in code, assuming we have these functions:
func getFoos() -> AnyPublisher<[Foo], Error>
func getPartials(for: Foo) -> AnyPublisher<[Partial], Error>
func getMoreInfo(for: Partial, of: Foo) -> AnyPublisher<MoreInfo, Error>
We can do the following:
getFoos()
.flatMap { fooArr in
fooArr.publisher.setFailureType(to: Error.self)
}
// per-foo element async processing
.flatMap { foo in
getPartials(for: foo)
.flatMap { partialArr in
partialArr.publisher.setFailureType(to: Error.self)
}
// per-partial of foo async processing
.flatMap { partial in
getMoreInfo(for: partial, of: foo)
// build completed partial with more info
.map { moreInfo in
var newPartial = partial
newPartial.moreInfo = moreInfo
return newPartial
}
}
.collect()
// build completed foo with all partials
.map { partialArr in
var newFoo = foo
newFoo.partials = partialArr
return newFoo
}
}
.collect()
(Deleted the old answer)
Using the accepted answer, I wound up with this structure:
head // [Entity]
.flatMap { entities -> AnyPublisher<Entity, Error> in
Publishers.Sequence(sequence: entities).eraseToAnyPublisher()
}.flatMap { entity -> AnyPublisher<Entity, Error> in
self.makeFuture(for: entity) // [Derivative]
.flatMap { derivatives -> AnyPublisher<Derivative, Error> in
Publishers.Sequence(sequence: derivatives).eraseToAnyPublisher()
}
.flatMap { derivative -> AnyPublisher<Derivative2, Error> in
self.makeFuture(for: derivative).eraseToAnyPublisher() // Derivative2
}.collect().map { derivative2s -> Entity in
self.configuredEntity(entity, from: derivative2s)
}.eraseToAnyPublisher()
}.collect()
That has exactly the elegant tightness I was looking for! So the idea is:
We receive an array of something, and we need to process each element asynchronously. The old way would have been a DispatchGroup and a for...in
loop. The Combine equivalent is:
The equivalent of the
for...in
line isflatMap
and Publishers.Sequence.The equivalent of the DispatchGroup (dealing with asynchronousness) is a further
flatMap
(on the individual element) and some publisher. In my case I start with a Future based on the individual element we just received.The equivalent of the right curly brace at the end is
collect()
, waiting for all elements to be processed and putting the array back together again.
So to sum up, the pattern is:
flatMap
the array to a Sequence.flatMap
the individual element to a publisher that launches the asynchronous operation on that element.- Continue the chain from that publisher as needed.
collect
back into an array.
By nesting that pattern, we can take advantage of Swift scoping rules to keep the thing we need to process in scope until we have acquired enough information to produce the processed object.