Spark: get number of cluster cores programmatically
Found this while looking for the answer to pretty much the same question.
I found that:
Dataset ds = ...
ds.coalesce(sc.defaultParallelism());
does exactly what the OP was looking for.
For example, my 5 node x 8 core cluster returns 40 for the defaultParallelism
.
There are ways to get both the number of executors and the number of cores in a cluster from Spark. Here is a bit of Scala utility code that I've used in the past. You should easily be able to adapt it to Java. There are two key ideas:
The number of workers is the number of executors minus one or
sc.getExecutorStorageStatus.length - 1
.The number of cores per worker can be obtained by executing
java.lang.Runtime.getRuntime.availableProcessors
on a worker.
The rest of the code is boilerplate for adding convenience methods to SparkContext
using Scala implicits. I wrote the code for 1.x years ago, which is why it is not using SparkSession
.
One final point: it is often a good idea to coalesce to a multiple of your cores as this can improve performance in the case of skewed data. In practice, I use anywhere between 1.5x and 4x, depending on the size of data and whether the job is running on a shared cluster or not.
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}
Update
Recently, getExecutorStorageStatus
has been removed. We have switched to using SparkEnv
's blockManager.master.getStorageStatus.length - 1
(the minus one is for the driver again). The normal way to get to it, via env
of SparkContext
is not accessible outside of the org.apache.spark
package. Therefore, we use an encapsulation violation pattern:
package org.apache.spark
object EncapsulationViolator {
def sparkEnv(sc: SparkContext): SparkEnv = sc.env
}