How to write spark streaming DF to Kafka topic
With Spark >= 2.2
Both read and write operations are possible on Kafka using Structured Streaming API
Build stream from Kafka topic
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Read the key and value and apply the schema for both, for simplicity we are making converting both of them to String
type.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Since dsStruc
have the schema, it accepts all SQL kind operations like filter
, agg
, select
..etc on it.
Write stream to Kafka topic
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
More configuration for Kafka integration to read or write
Key artifacts to add in the application
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
- Use (and re-use) one
KafkaProducer
instance per executor process/JVM.
Here's the high-level setup for this approach:
- First, you must "wrap" Kafka's
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use alazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry aboutKafkaProducer
not being serializable. - You "ship" the wrapped producer to each executor by using a broadcast variable.
- Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
Hope this helps.