Show progress of Java 8 stream processing
As others have pointed out: This has some caveats. First of all, streams are not supposed to be used for something like this.
On a more technical level, one could further argue:
- A stream can be infinite
- Even if you know the number of elements: This number might be distorted by operations like
filter
orflatMap
- For a parallel stream, tracking the progress will enforce a synchronization point
- If there is a terminal operation that is expensive (like the aggregation in your case), then the reported progress might not even sensibly reflect the computation time
However, keeping this in mind, one approach that might be reasonable for your application case is this:
You could create a Function<T,T>
that is passed to a map
of the stream. (At least, I'd prefer that over using peek
on the stream, as suggested in another answer). This function could keep track of the progress, using an AtomicLong
for counting the elements. In order to keep separate things separate, this progress could then be just forwarded to a Consumer<Long>
, which will take care of the presentation
The "presentation" here refers to printing this progress to the console, normalized or as percentages, referring to a size that could be known wherever the consumer is created. But the consumer can then also take care of only printing, for example, every 10th element, or only print a message if at least 5 seconds have passed since the previous one.
import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamProgress
{
public static void main(String[] args)
{
int size = 250;
Stream<Integer> stream = readData(size);
LongConsumer progressConsumer = progress ->
{
// "Filter" the output here: Report only every 10th element
if (progress % 10 == 0)
{
double relative = (double) progress / (size - 1);
double percent = relative * 100;
System.out.printf(Locale.ENGLISH,
"Progress %8d, relative %2.5f, percent %3.2f\n",
progress, relative, percent);
}
};
Integer result = stream
.map(element -> process(element))
.map(progressMapper(progressConsumer))
.reduce(0, (a, b) -> a + b);
System.out.println("result " + result);
}
private static <T> Function<T, T> progressMapper(
LongConsumer progressConsumer)
{
AtomicLong counter = new AtomicLong(0);
return t ->
{
long n = counter.getAndIncrement();
progressConsumer.accept(n);
return t;
};
}
private static Integer process(Integer element)
{
return element * 2;
}
private static Stream<Integer> readData(int size)
{
Iterator<Integer> iterator = new Iterator<Integer>()
{
int n = 0;
@Override
public Integer next()
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return n++;
}
@Override
public boolean hasNext()
{
return n < size;
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
iterator, Spliterator.ORDERED), false);
}
}
First of all, Streams are not meant to achieve these kind of tasks (as opposed to a classic data structure). If you know already how many elements your stream will be processing you might go with the following option, which is, I repeat, not the goal of streams.
Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
.map(row -> process(row))
.peek(stat -> {
if (loader.incrementAndGet() % fivePercent == 0) {
System.out.println(loader.get() + " elements on " + elementsCount + " treated");
System.out.println((5*(loader.get() / fivePercent)) + "%");
}
})
.reduce(MyStat::aggregate);