How to pivot streaming dataset?
tl;dr pivot
aggregation is not directly supported by Spark Structured Streaming up to and including 2.4.4.
As a workaround, use DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach.
I use the latest version of Spark 2.4.4 as of now.
scala> spark.version
res0: String = 2.4.4
UnsupportedOperationChecker (that you can find in the stack trace) checks whether (the logical plan of) a streaming query uses supported operations only.
When you execute pivot
you had to groupBy
first as that's the only interface to give you pivot
available.
There are two issues with pivot
:
pivot
wants to know how many columns to generate values for and hence doescollect
which is not possible with streaming Datasets.pivot
is actually another aggregation (besidegroupBy
) that Spark Structured Streaming does not support
Let's look at the issue 1 with no columns to pivot on defined.
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp") // <-- pivot with no values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
... 49 elided
The last two lines show the issue, i.e. pivot
does collect
under the covers and hence the issue.
The other issue is that even though you'd specify the values for columns to pivot on you'd then get the other issue due to multiple aggregations (and you can see that it's actually a check for streaming not batch as has happened with the first case).
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp", Seq(1)) // <-- pivot with explicit values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
+- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
... 49 elided