Spark: writing DataFrame as compressed JSON
The following solutions use pyspark, but I assume the code in Scala would be similar.
First option is to set the following when you initialise your SparkConf:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
With the code above any file you produce using that sparkContext is automatically compressed using gzip.
Second option, if you want to compress only selected files within your context. Lets say "df" is your dataframe and filename your destination:
df_rdd = self.df.toJSON()
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
Setting compression options on a SparkConf
is NOT a good practice, as the accepted answer. It changed the behaviour globally instead of indicating the settings on a per-file basis. The truth is, explicit is always better than implicit. There are also some cases where users cannot manipulate the context configuration easily, like spark-shell or in codes designed as a submodule of another.
The correct way
Writing DataFrame
with compression is supported since Spark 1.4. Several ways to achieve that:
One
df.write.json("filename.json", compression="gzip")
That's it! Just use DataFrameWriter.json()
as you wish.
The magic is hidden in the code pyspark/sql/readwriter.py
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: ...
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param dateFormat: ...
:param timestampFormat: ...
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
Supported compression formats are bzip2, gzip, lz4, snappy and deflate, case insensitive.
The scala API should be the same.
Another
df.write.options(compression="gzip").json("filename.json")
Similar as above. more options can be suppied as keyword arguments. available since Spark 1.4.
Third
df.write.option("compression", "gzip").json("filename.json")
DataFrameWriter.option()
is added since Spark 1.5. Only one parameter could be added at a time.
With Spark 2.X (and maybe earlier, I did not test) there is a simpler way to write a compressed JSON, which does not require changing the configuration:
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")
This also works for CSV and for Parquet, just use .csv() and .parquet() instead of .json() to write the file after setting the compression option.
The possible codecs are: none, bzip2, deflate, gzip, lz4 and snappy.