When are cache and persist executed (since they don't seem like actions)?
Dataset's cache
and persist
operators are lazy and don't have any effect until you call an action (and wait till the caching has finished which is the extra price for having a better performance later on).
From Spark's official documentation RDD Persistence (with the sentence in bold mine):
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the
persist()
orcache()
methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
That's exactly the reason why some people (and Spark SQL itself!) do the following trick:
rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
to trigger the caching.
count
operator is fairly cheap so the net effect is that the caching is executed almost immediately after the line (there might be a small delay before the caching has completed as it executes asynchronously).
The benefits of this count
after persist
are as follows:
No action (but the
count
itself) will "suffer" the extra time for cachingThe time between this line and the place where the cached
rdd2
is used could be enough to fully complete the caching and hence the time would be used better (without extra "slowdown" for caching)
So when you asked:
would
persist
be considered a transformation or an action or neither?
I'd say it's neither and consider it an optimization hint (that may or may not be executed or taken into account ever).
Use web UI's Storage tab to see what Datasets (as their underlying RDDs) have already been persisted.
You can also see cache
or persist
operators' output using explain
(or simply QueryExecution.optimizedPlan
).
val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
scala> q1.explain
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [(id % 5)#120L, count#119L]
+- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
+- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
+- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
+- *(1) Range (0, 10, step=1, splits=16)
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
02 +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
03 +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
04 +- *(1) Range (0, 10, step=1, splits=16)
Please note that the count
above is a standard function not an action and no caching happens. It's just a coincidence that count
is the name of a standard function and an Dataset action.
You can cache a table using pure SQL (this is eager!)
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
*(1) ColumnarToRow
+- Scan In-memory table `range5` [id#51L]
+- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 5, step=1, splits=16)
InMemoryTableScan
physical operator (with InMemoryRelation
logical plan) is how you can make sure that the query is cached in-memory and hence reused.
Moreover, Spark SQL itself uses the same pattern to trigger DataFrame caching for SQL's CACHE TABLE query (which, unlike RDD caching, is by default eager):
if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
}
That means that depending on the operators you may have different result as far as caching is concerned. cache
and persist
operators are lazy by default while SQL's CACHE TABLE
is eager.
Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?
Yes
Also, at what point is rdd2 actually persisted?
The data is read, mapped, and persisted all at once while executing the count statement
would persist be considered a transformation or an action or neither?
It's not really either, but in terms of the processing work done, you can consider it like a transformation. Spark is lazy and will only do work when you ask for a result. No result is required when you persist a data frame, so Spark does no work. In that way, persist
is like a transformation