Spark structured streaming consistency across sinks
One important thing to keep in mind is the 2 sinks will be used from 2 distinct queries, each reading independently from the source. So checkpointing is done per-query.
Whenever you call start
on a DataStreamWriter
that results in a new query and if you set checkpointLocation
each query will have its own checkpointing to track offsets from the sink.
val input = spark.readStream....
val query1 = input.select('colA, 'colB)
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir1")
.start("/path1")
val query2 = input.select('colA, 'colB)
.writeStream
.format("csv")
.option("checkpointLocation", "path/to/checkpoint/dir2")
.start("/path2")
So each query is reading from the source and tracking offsets independently. Which then also means, each query can be at different offsets of the input stream and you can restart either or both without impacting the other.
UPDATE
I wanted to make another suggestion now that Databricks Delta is open sourced. A common pattern I've used is landing data from upstream sources directly into an append-only Delta table. Then, with Structured Streaming, you can efficiently subscribe to the table and process the new records incrementally. Delta's internal transaction log is more efficient than S3 file listings required with the basic file source. This ensures you have a consistent source of data across multiple queries, pulling from S3 vs Kinesis.
What Silvio has written is absolutely correct. Writing to 2 sinks will start two streaming queries running independently of each other ( effectively 2 streaming applications reading same data 2 times and processing 2 times and checkpointing on their own).
I want to just add that if you want both the queries to stop/pause at the same time in case of restart or failure of any one of the query, there is option of using an api : awaitAnyTermination()
Instead of using :
query.start().awaitTermination()
use :
sparkSession.streams.awaitAnyTermination()
adding excerpts from the api documentation :
/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `resetTerminated()` was called. If any query was terminated
* with an exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws StreamingQueryException if any query has terminated with an exception
*
* @since 2.0.0
*/
@throws[StreamingQueryException]
def awaitAnyTermination(): Unit = {
awaitTerminationLock.synchronized {
while (lastTerminatedQuery == null) {
awaitTerminationLock.wait(10)
}
if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
throw lastTerminatedQuery.exception.get
}
}
}