Combine multiple Kotlin flows in a list without waiting for a first value
I would still like to avoid mapping to an intermediary wrapper type, and as someone mentioned in the comments, the behaviour is slightly wrong (this emits an empty list at first if no arguments emitted anything yet), but this is slightly nicer than the solutions I had in mind when I wrote the question (still really similar) and works with nullable types:
inline fun <reified T> instantCombine(
flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
flow.map {
@Suppress("USELESS_CAST") // Required for onStart(null)
Holder(it) as Holder<T>?
}
.onStart { emit(null) }
}) {
it.filterNotNull()
.map { holder -> holder.value }
}
And here's a test suite that passes with this implementation:
class InstantCombineTest {
@Test
fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
assertThat(instantCombine(emptyList<Flow<String>>()).toList())
.isEmpty()
}
@Test
fun `intermediate steps are emitted`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
assertThat(instantCombine(a, b, c).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a1", "b0"),
listOf("a2", "b0"),
listOf("a2", "b1"),
listOf("a2", "b1", "c"),
listOf("a2", "b2", "c")
)
.inOrder()
}
@Test
fun `a single flow is mirrored`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a2")
)
.inOrder()
}
@Test
fun `null values are kept`() = runBlockingTest {
val a = flow {
emit("a")
emit(null)
emit("b")
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String?>(),
listOf("a"),
listOf(null),
listOf("b")
)
.inOrder()
}
}
How about this:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
It solves a few problems:
- no need to introduce a new type
[]
is not in the resulting Flow- abstracts away null-handling (or however it is solved) from the call-site, the resulting Flow deals with it itself
So, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
Output:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Try it out here!
Edit: Updated answer to handle Flows which emit null values too.
* The used low-level array is thread-safe. It's as if you are dealing with single variables.