Stream stateful computation: cumulative sums

You can do this with an atomic number. For example:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}

This outputs:

1
3
6
10

I've used a Long to store the sums, because it's entirely possible that two ints add up to well over Integer.MAX_VALUE, and a long has less of a chance of overflow.


It's possible to do with a collector that then creates a new stream:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}

This is used as:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();

Hopefully you can see that the important attribute of this collector is that even if the stream is parallel as the Accumulator instances are combined it adjusts the totals.

This is obviously not as efficient as a map operation because it collects the whole stream and then produces a new stream. But that's not just an implementation detail: it's a necessary function of the fact that streams are intended to be potentially concurrently processed.

I have tested it with IntStream.range(0, 10000).parallel() and it functions correctly.