Why so many tasks in my spark job? Getting 200 Tasks By Default
I am having a similar problem. But in my scenario the collection I am parallelizing has less elements than the number of tasks scheduled by Spark (causing spark to behave oddly sometimes). Using the forced partition number I was able to fix this issue.
It was something like this:
collection = range(10) # In the real scenario it was a complex collection
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario
Then, I saw in the Spark log:
INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
This is a classic Spark question.
The two tasks used for reading (Stage Id 0 in second figure) is the defaultMinPartitions
setting which is set to 2. You can get this parameter by reading the value in the REPL sc.defaultMinPartitions
. It should also be visible in the Spark UI under the "Environment" tab.
You can take a look at the code from GitHub to see that this exactly what is happening. If you want more partitions to be used on read, just add it as a parameter e.g., sc.textFile("a.txt", 20)
.
Now the interesting part comes from the 200 partitions that come on the second stage (Stage Id 1 in second figure). Well, each time there is a shuffle, Spark needs to decide how many partitions will the shuffle RDD have. As you can imagine, the default is 200.
You can change that using:
sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
If you run your code with this configuration you will see that the 200 partitions are not going to be there any more. How to set this parameter is kind of an art. Maybe choose 2x the number of cores you have (or whatever).
I think Spark 2.0 has a way to automatically infer the best number of partitions for shuffle RDDs. Looking forward to that!
Finally, the number of jobs you get has to do with how many RDD actions the resulting optimized Dataframe code resulted to. If you read the Spark specs it says that each RDD action will trigger one job. When you action involves a Dataframe or SparkSQL the Catalyst optimizer will figure out an execution plan and generate some RDD based code to execute it. It's hard to say exactly why it uses two actions in your case. You may need to look at the optimized query plan to see exactly what is doing.