How to collect array of emitted values from Observable.from?
In your call to scan
you have not specified a seed for the accumulator. In that circumstance, the first value is used as a seed. For example:
Rx.Observable
.from(["a", "b", "c"])
.scan((acc, value) => acc + value)
.subscribe(value => console.log(value));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
In your snippet, the first value is not an array, so you cannot call push
on it. To accumulate the values into an array, you can specify an array seed like this:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
acc.push(value);
return acc;
}, []) // Note that an empty array is use as the seed
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Although, for some use cases, it would be preferable to not mutate the array:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => [...acc, value], [])
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Note that scan
emits an array for each value that it receives. If you only want a single array emitted when the observable completes, you can use the toArray
operator instead:
Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.toArray()
.subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Be carefull with this code:
const obs = Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
acc.push(value);
return acc;
}, []);
obs.subscribe(value => console.log(JSON.stringify(value)));
obs.subscribe(value => console.log(JSON.stringify(value)));
Result will be a bit unexpected:
["a"]
["a","b"]
["a","b","c"]
["a","b","c","a"]
["a","b","c","a","b"]
["a","b","c","a","b","c"]
"acc" variable is reference object and each subscriber gets stream data and adds data to the same object again. It might be a lot of solutions for avoiding it, this is creation new object when stream data is received again :
var obs = Rx.Observable
.from(["a", "b", "c"])
.concatMap(value => Rx.Observable.of(value))
.scan((acc, value) => {
//clone initial value
if (acc.length == 0) {
acc = [];
}
acc.push(value);
return acc
}, []); // Note that an empty array is use as the seed
obs.subscribe(value => console.log(JSON.stringify(value)));
obs.subscribe(value => console.log(JSON.stringify(value)));
result as expected:
["a"]
["a","b"]
["a","b","c"]
["a"]
["a","b"]
["a","b","c"]
I hope it saves a lot time for someone
Another option is bufferCount(count)
if you know the length of the input array you can get a single output containing that number of items. Cleaner syntax than having to remember how to use scan
.
Note: If you don't know the size (although in your example you would) then count
represents a maximum - but this may have resource constraints so don't just make it 99999.
const array = source$.pipe(bufferCount(10));
In my case I had a list of 'operations' being executed and I knew the total number of steps in advance, so bufferCount
worked quite well. However be sure to carefully consider error conditions
// one approach to handling errors that still returns an output array
// only use this if a single failure shouldn't stop the collection
const array = source$.pipe(
map((result) => ({ hasError: false, result: result }),
catchError((err) => ({ hasError: true, error: err }),
bufferCount(10));
The decision on how to handle errors will vary greatly based on what your observables actually are - but the point here is to show bufferCount()
as an option.
(There are other buffer
operations available too)