How to read Avro file in PySpark
The former solution requires to install a third-party Java dependency, which is not something most Python devs are happy with. But you don't really need an external library if all you want to do is parse your Avro files with a given schema. You can just read the binary files and parse them with your favorite python Avro package.
For instance, this is how you can load Avro files using fastavro
:
from io import BytesIO
import fastavro
schema = {
...
}
rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
.flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))
print(rdd.collect())
Spark >= 2.4.0
You can use built-in Avro support. The API is backwards compatible with the spark-avro
package, with a few additions (most notably from_avro
/ to_avro
function).
Please note that module is not bundled with standard Spark binaries and has to be included using spark.jars.packages
or equivalent mechanism.
See also Pyspark 2.4.0, read avro from kafka with read stream - Python
Spark < 2.4.0
You can use spark-avro
library. First lets create an example dataset:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
Reading it using spark-csv
is as simple as this:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
For Spark < 2.4.0, PySpark can create the dataframe by reading the avro file and its respective schema(.avsc) without any external python module by using the JAR "com.databricks.spark.avro" and python's "subprocess" module
Below is the solution:
avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro
#use subprocess module
import subproccess as SP
load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()
avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)