How to combine mergeMap observables calls and return just one value for the whole observable
To get combined result of all response of mergeMap, you can also try like this:
return this.request1().pipe(
mergeMap(res1=> this.request2(res1.id).pipe(
map(res2=> {
return {
res1: res1,
res2: res2
}
})
))
)
Q1: You need a toArray
— it will combine all your stream values into one array:
Q2: To omit all values on the stream and emit a value upon completion
concat(
source$.pipe(ignoreElements()),
of(true)
)
See "Emit a value upon source completion" example in a playground
Here's an annotated example to help clarify your questions on the subscription process you ask about.
Q1:
As pointed out in another answer, the reduce
operator is what you'll want to include in your source
pipeline. A key detail with reduce
is that it only emits upon completion of the corresponding source observable. If instead you want emission as those inner observables complete, then scan
is appropriate. Another difference with the latter is that it doesn't require source completion.
Q2:
With this question, refer to my example below and think of each argument to the processing pipeline as the lifetime of a single request. Here, completion is implicit. It occurs after the last value of the inner observables is processed.
However, if there's no bound to inner observables, then knowing when all the inner observables are complete isn't possible. In such a case, you'll find that reduce()
won't work.
const { from, of, Subject } = rxjs;
const { mergeMap, map, tap, reduce, scan } = rxjs.operators;
// Use a subject to simulate processing.
// Think of each argument as a request to the processing pipeline below.
const properties = new Subject();
// Establish processing pipeline
const source = properties.pipe(
// `mergeMap` here flattens the output value to the combined inner output values
mergeMap(props =>
// Each item inside the argument should be piped as separate values
from(props).pipe(
// `mergeMap` here flattens the output value to `{ key, value }`
mergeMap(key =>
of('test').pipe(
map(value => ({ key, value })),
),
),
// Unlike `scan`, `reduce` only emits upon "completion".
// Here, "completion" is implicit - it is after the last
// element of `from(props)` has been processed.
reduce((a, i) => [...a, i], []),
)
),
);
// Subscribe to the pipeline to observe processing.
source.subscribe(console.log);
// Trigger a processing request with an argument
properties.next(['session', 'user']);
// Trigger another processing request
properties.next(['session', 'user']);
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>