Is there a way to dynamically stop Spark Structured Streaming?
Structured Streaming Monitoring Options
You can use query.lastProgress to get the timestamp and build logic around that. Don't forget to save your checkpoint to a durable, persistent, available store.
Putting together a couple pieces of advice:
- As @Michael West pointed out, there are listeners to track progress
- From what I gather, Structured Streaming doesn't yet support graceful shutdown
So one option is to periodically check for query activity, dynamically shutting down depending on a configurable state (when you determine no further progress can/should be made):
// where you configure your spark job...
spark.streams.addListener(shutdownListener(spark))
// your job code starts here by calling "start()" on the stream...
// periodically await termination, checking for your shutdown state
while(!spark.sparkContext.isStopped) {
if (shutdown) {
println(s"Shutting down since first batch has completed...")
spark.streams.active.foreach(_.stop())
spark.stop()
} else {
// wait 10 seconds before checking again if work is complete
spark.streams.awaitAnyTermination(10000)
}
}
Your listener can dynamically shutdown in a variety of ways. For instance, if you're only waiting on a single batch, then just shutdown after the first update:
var shutdown = false
def shutdownListener(spark: SparkSession) = new StreamingQueryListener() {
override def onQueryStarted(_: QueryStartedEvent): Unit = println("Query started: " + queryStarted.id)
override def onQueryTerminated(_: QueryTerminatedEvent): Unit = println("Query terminated! " + queryTerminated.id)
override def onQueryProgress(_: QueryProgressEvent): Unit = shutdown = true
}
Or, if you need to shutdown after more complicated state changes, you could parse the json body of the queryProgress.progress
to determine whether or not to shutdown at a given onQueryUpdate
event firing.