Compute size of Spark dataframe - SizeEstimator gives unexpected results
Unfortunately, I was not able to get reliable estimates from SizeEstimator
, but I could find another strategy - if the dataframe is cached, we can extract its size from queryExecution
as follows:
df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
catalyst_plan).optimizedPlan.stats.sizeInBytes
For the example dataframe, this gives exactly 4.8GB (which also corresponds to the file size when writing to an uncompressed Parquet table).
This has the disadvantage that the dataframe needs to be cached, but it is not a problem in my case.
EDIT: Replaced df.cache.foreach(_=>_)
by df.cache.foreach(_ => ())
, thanks to @DavidBenedeki for pointing it out in the comments.
Apart from Size estimator, which you have already tried(good insight)..
below is another option
RDDInfo[] getRDDStorageInfo()
Return information about what RDDs are cached, if they are in mem or on both, how much space they take, etc.
actually spark storage tab uses this.Spark docs
Below is the implementation from spark
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
getRDDStorageInfo(_ => true)
}
private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
assertNotStopped()
val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id
val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
}
rddInfos.filter(_.isCached)
}
yourRDD.toDebugString
from RDD also uses this. code here
General Note :
In my opinion, to get optimal number of records in each partition and check your repartition is correct and they are uniformly distributed, I would suggest to try like below... and adjust your re-partition number. and then measure the size of partition... would be more sensible. to address this kind of problems
yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}
.toDF("PartitionNumber","NumberOfRecordsPerPartition")
.show
or existing spark functions (based on spark version)
import org.apache.spark.sql.functions._
df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show
SizeEstimator
returns the number of bytes an object takes up on the JVM heap. This includes objects referenced by the object, the actual object size will almost always be much smaller.
The discrepancies in sizes you've observed are because when you create new objects on the JVM the references take up memory too, and this is being counted.
Check out the docs here ð¤©
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.util.SizeEstimator$