How to know which piece of code runs on driver or executor?
There is no simple and straightforward answer here.
As a rule of thumb everything that is executed inside closures of higher order functions like mapPartitions
(map
, filter
, flatMap
) or combineByKey
should be handled mostly by executor machines. Everything outside these are handled by the driver. But you have to be aware that it is a serious simplification.
Depending on a specific method and language at least a part of the job can be handled by the driver. For example when you use combine
-like methods (reduce
, aggregate
) final merging is applied locally on the driver machine. Complex algorithms (like many can ML / MLlib tools) can interleave distributed and local processing when needed.
Moreover data processing is only a fraction of a whole job. Driver is responsible for bookeeping, accumulator processing, initial broadcasting and other secondary tasks. It also handles lineage and DAG processing and generating execution plans for higher level APIs (Dataset
, SparkSQL
).
While the whole picture is relatively complex in practice your choices are relatively limited. You can:
- Avoid collecting data (
collect
,toLocalIterator
) to process locally. - Perform more work on the workers with
tree*
(treeAggregate
,treeReduce
) methods. - Avoid unnecessary tasks which increase bookkeeping costs.
Any Spark application consists of a single Driver process and one or more Executor processes. The Driver process will run on the Master node of your cluster and the Executor processes run on the Worker nodes. You can increase or decrease the number of Executor processes dynamically depending upon your usage but the Driver process will exist throughout the lifetime of your application.
The Driver process is responsible for a lot of things including directing the overall control flow of your application, restarting failed stages and the entire high level direction of how your application will process the data.
Coding your application so that more data is processed by Executors falls more under the purview of optimising your application so that it processes data more efficiently/faster making use of all the resources available to it in the cluster. In practice, you do not really need to worry about making sure that more of your data is being processed by executors.
That being said, there are some Actions, which when triggered, necessarily involve shuffling around of data. If you call the collect
action on an RDD, all the data is brought to the Driver process and if your RDD had a sufficiently large amount of data in it, an Out Of Memory
error will be triggered by the application, as the single machine running the Driver process will not be able to hold all the data.
Keeping the above in mind, Transformations are lazy and Actions are not. Transformations basically transform one RDD into another. But calling a transformation on an RDD does not actually result in any data being processed anywhere, Driver or Executor. All a transformation does is that it adds to the DAG's lineage graph which will be executed when an Action is called.
So the actual processing happens when you call an Action on an RDD. The simplest example is that of calling collect
. As soon as an action is called, Spark gets to work and executes the previously saved DAG computations on the specified RDD, returning the result back. Where these computations are executed depends entirely on your application.