Will parallel stream work fine with distinct operation?
You seem to miss quite a few things from the documentation you provide and the actual example.
Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful.
In your example, you don't have any stateful operations defined by you. Stateful in the doc means the ones the you define, not the ones that are implemented by jdk
itself - like distinct
in your example. But either way you could define a stateful operation that would be correct, even Stuart Marks - working at Oracle/Java, provides such an example.
So you are more than OK in the examples that you provide, be it parallel or not.
The expensive part of distinct
(in parallel) come from the fact that internally there has to be a thread-safe data structure that would keep distinct elements; in jdk case it is a ConcurrentHashMap
used in case the order does not matter, or a reduction using a LinkedHashSet
when order matters.
distinct
btw is a pretty smart implementation, it looks if your source of the stream is already distinct (in such a case it is a no-op), or looks if your data is sorted, in which case it will do a little smarter traversal of the source (since it knows that if you have seen one element, the next to come is either the same you just seen or a different one), or using a ConcurrentHashMap
internally, etc.
Roughly pointing out the relevant parts of the doc
(Emphasis, mine):
Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements
Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering
If you read further down (section on ordering):
Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().
...
For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.
In conclusion,
- distinct will work fine with parallel streams, but as you may already know, it has to consume the entire stream before continuing and this may use a lot of memory.
- If the source of the items is an unordered collection (such as hashset) or the stream is
unordered()
, thendistinct
is not worried about ordering the output and thus will be efficient
Solution is to add .unordered()
to the stream pipeline if you are not worried about order and would like to see more performance.
List<String> result2 = strList.parallelStream()
.unordered()
.map(String::toLowerCase)
.distinct()
.collect(Collectors.toList());
Alas there is no (available builtin) concurrent hashset in Java (unless they got clever with ConcurrentHashMap
), so I can only leave you with the unfortunate possibility that distinct is implemented in a blocking fashion using a regular Java set. In which case, I don't see any benefit of doing a parallel distinct.
Edit: I spoke too soon. There might be some benefit with using parallel streams with distinct. It looks like distinct
is implemented with more cleverness than I initially thought. See @Eugene's answer.