Why is CompletableFuture join/get faster in separate streams than using one stream
The two approaches are quite different, let me try to explain it clearly
1st approach : In the first approach you are spinning up all Async
requests for all 6 tasks and then calling join
function on each one of them to get the result
2st approach : But in the second approach you are calling the join
immediately after spinning the Async
request for each task. For example after spinning Async
thread for task 1
calling join
, make sure that thread to complete task and then only spin up the second task with Async
thread
Note : Another side if you observe the output clearly, In the 1st approach output appears in random order since the all six tasks were executed asynchronously. But during second approach all tasks were executed sequentially one after the another.
I believe you have an idea how stream map
operation is performed, or you can get more information from here or here
To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.
The stream framework does not define the order in which map
operations are executed on stream elements, because it is not intended for use cases in which that might be a relevant issue. As a result, the particular way your second version is executing is equivalent, essentially, to
List<Integer> results = new ArrayList<>();
for (Integer sleepTime : sleepTimes) {
results.add(CompletableFuture
.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.join());
}
...which is itself essentially equivalent to
List<Integer> results = new ArrayList<>()
for (Integer sleepTime : sleepTimes) {
results.add(sleepTask(sleepTime));
}
@Deadpool answered it pretty well, just adding my answer which can help someone understand it better.
I was able to get an answer by adding more printing to both methods.
TLDR
2 stream approach: We are starting up all 6 tasks asynchronously and then calling join function on each one of them to get the result in a separate stream.
1 stream approach: We are calling the join immediately after starting up each task. For example after spinning a thread for task 1, calling join makes sure the thread waits for completion of task 1 and then only spin up the second task with async thread.
Note: Also, if we observe the output clearly, in the 1 stream approach, output appears sequential order since the all six tasks were executed in order. But during second approach all tasks were executed in parallel, hence the random order.
Note 2: If we replace stream()
with parallelStream()
in the 1 stream approach, it will work identically to 2 stream approach.
More proof
I added more printing to the streams which gave the following outputs and confirmed the note above :
1 stream:
List<Integer> results = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
doing join on task 1
Task with sleep time 2
doing join on task 2
Task with sleep time 3
doing join on task 3
Task with sleep time 4
doing join on task 4
Task with sleep time 5
doing join on task 5
Task with sleep time 6
doing join on task 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]
2 streams:
List<CompletableFuture<Integer>> futures = sleepTimes.stream()
.map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
.exceptionally(ex -> { ex.printStackTrace(); return -1; }))
.collect(Collectors.toList());
List<Integer> result = futures.stream()
.map(f -> {
int num = f.join();
System.out.println(String.format("doing join on task %d", num));
return num;
})
.collect(Collectors.toList());
WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 2
Task with sleep time 5
Task with sleep time 3
Task with sleep time 1
Task with sleep time 4
Task with sleep time 6
doing join on task 1
doing join on task 2
doing join on task 3
doing join on task 4
doing join on task 5
doing join on task 6
done in 6 seconds.
[1, 2, 3, 4, 5, 6]