Why does Complete output mode require aggregation?
From the Structured Streaming Programming Guide - other queries (excluding aggregations, mapGroupsWithState
and flatMapGroupsWithState
):
Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
To answer the question:
What would happen if Spark allowed Complete output mode with no aggregations in a streaming query?
Probably OOM.
The puzzling part is why dropDuplicates("id")
is not marked as aggregation.
I think the problem is the output mode. instead of using OutputMode.Complete, use OutputMode.Append as shown below.
scala> val q = ids
.writeStream
.format("memory")
.queryName("dups")
.outputMode(OutputMode.Append)
.trigger(Trigger.ProcessingTime(30.seconds))
.option("checkpointLocation", "checkpoint-dir")
.start