Read from Kafka and write to hdfs in parquet
You already have a couple of good answers on the topic.
Just wanted to stress out - be careful to stream directly into a parquet table. Parquet's performance shines when parquet row group sizes are large enough (for simplicity, you can say file size should be in order of 64-256Mb for example), to take advantage of dictionary compression, bloom filters etc. (one parquet file can have multiple row chunks in it, and normally does have multiple row chunks in each file; although row chunks can't span multiple parquet files)
If you're streaming directly to a parquet table, then you'll end up very likely with a bunch of tiny parquet files (depending on mini-batch size of Spark Streaming, and volume of data). Querying such files can be very slow. Parquet may require reading all files' headers to reconcile schema for example and it's a big overhead. If this is the case, you will need to have a separate process that will, for example, as a workaround, read older files, and writes them "merged" (this wouldn't be a simple file-level merge, a process would actually need to read in all parquet data and spill out larger parquet files).
This workaround may kill the original purpose of data "streaming". You could look at other technologies here too - like Apache Kudu, Apache Kafka, Apache Druid, Kinesis etc that can work here better.
Update: since I posted this answer, there is now a new strong player here - Delta Lake. https://delta.io/ If you're used to parquet, you'll find Delta very attractive (actually, Delta is built on top of parquet layer + metadata). Delta Lake offers:
ACID transactions on Spark:
- Serializable isolation levels ensure that readers never see inconsistent data.
- Scalable metadata handling: Leverages Spark’s distributed processing power to handle all the metadata for petabyte-scale tables with billions of files at ease.
- Streaming and batch unification: A table in Delta Lake is a batch table as well as a streaming source and sink. Streaming data ingest, batch historic backfill, interactive queries all just work out of the box.
- Schema enforcement: Automatically handles schema variations to prevent insertion of bad records during ingestion.
- Time travel: Data versioning enables rollbacks, full historical audit trails, and reproducible machine learning experiments.
- Upserts and deletes: Supports merge, update and delete operations to enable complex usecases like change-data-capture, slowly-changing-dimension (SCD) operations, streaming upserts, and so on.
For reading data from Kafka and writing it to HDFS, in Parquet format, using Spark Batch job instead of streaming, you can use Spark Structured Streaming.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
It comes with Kafka as a built in Source, i.e., we can poll data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
For pulling the data from Kafka in batch mode, you can create a Dataset/DataFrame for a defined range of offsets.
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Each row in the source has the following schema:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
Now, to write Data to HDFS in parquet format, following code can be written:
df.write.parquet("hdfs://data.parquet")
For more information on Spark Structured Streaming + Kafka, please refer to following guide - Kafka Integration Guide
I hope it helps!