When does shuffling occur in Apache Spark?
Question : As for your question concerning when shuffling is triggered on Spark?
Answer : Any join
, cogroup
, or ByKey
operation involves holding objects in hashmaps or in-memory buffers to group or sort. join
, cogroup
, and groupByKey
use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey
and aggregateByKey
use data structures in the tasks for the stages on both sides of the shuffles they trigger.
Explanation : How does shuffle operation work in Spark?
The shuffle operation is implemented differently in Spark compared to Hadoop. I don't know if you are familiar with how it works with Hadoop but let's focus on Spark for now.
On the map side, each map task in Spark writes out a shuffle file (os disk buffer) for every reducer – which corresponds to a logical block in Spark. These files are not intermediary in the sense that Spark does not merge them into larger partitioned ones. Since scheduling overhead in Spark is lesser, the number of mappers (M
) and reducers(R
) is far higher than in Hadoop. Thus, shipping M*R
files to the respective reducers could result in significant overheads.
Similar to Hadoop, Spark also provide a parameter spark.shuffle.compress
to specify compression libraries to compress map outputs. In this case, it could be Snappy
(by default) or LZF
. Snappy
uses only 33KB of buffer for each opened file and significantly reduces risk of encountering out-of-memory errors.
On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task, on the contrary of Hadoop that had an option to spill this over to disk. This would of course happen only in cases where the reducer task demands all shuffled data for a GroupByKey
or a ReduceByKey
operation, for instance. Spark throws an out-of-memory exception in this case, which has proved quite a challenge for developers so far.
Also with Spark there is no overlapping copy phase, unlike Hadoop that has an overlapping copy phase where mappers push data to the reducers even before map is complete. This means that the shuffle is a pull operation in Spark, compared to a push operation in Hadoop. Each reducer should also maintain a network buffer to fetch map outputs. Size of this buffer is specified through the parameter spark.reducer.maxMbInFlight
(by default, it is 48MB).
For more information about shuffling in Apache Spark, I suggest the following readings :
- Optimizing Shuffle Performance in Spark by Aaron Davidson and Andrew Or.
- SPARK-751 JIRA issue and Consolidating Shuffle files by Jason Dai.
It occurs whenever data needs to moved between executors (worker nodes)