Spark parquet partitioning : Large number of files
First I would really avoid using coalesce
, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark))
Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing many small files):
data.repartition($"key").write.partitionBy("key").parquet("/location")
If you want to set an arbitrary number of files (or files which have all the same size), you need to further repartition your data using another attribute which could be used (I cannot tell you what this might be in your case):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
another_key
could be another attribute of your dataset, or a derived attribute using some modulo or rounding-operations on existing attributes. You could even use window-functions with row_number
over key
and then round this by something like
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
This would put you N
records into 1 parquet file
using orderBy
You can also control the number of files without repartitioning by ordering your dataframe accordingly:
data.orderBy($"key").write.partitionBy("key").parquet("/location")
This will lead to a total of (at least, but not much more than) spark.sql.shuffle.partitions
files across all partitions (by default 200). It's even beneficial to add a second ordering column after $key
, as parquet will remember the ordering of the dataframe and will write the statistics accordingly. For example, you can order by an ID:
data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")
This will not change the number of files, but it will improve the performance when you query your parquet file for a given key
and id
. See e.g. https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example
Spark 2.2+
From Spark 2.2 on, you can also play with the new option maxRecordsPerFile
to limit the number of records per file if you have too large files. You will still get at least N files if you have N partitions, but you can split the file written by 1 partition (task) into smaller chunks:
df.write
.option("maxRecordsPerFile", 10000)
...
See e.g. http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/ and spark write to disk with N files less than N partitions
This is working for me very well:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
It produces N files in each output partition (directory), and is (anecdotally) faster than using coalesce
and (again, anecdotally, on my data set) faster than only repartitioning on the output.
If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop FileUtil
(or just the aws cli) to copy everything over:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Edit: As per discussion in comments:
You a dataset with a partition column of YEAR, but each given YEAR has vastly different amounts of data in it. So, one year might have 1GB of data, but another might have 100GB.
Here's psuedocode for one way to handle this:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
But, I don't actually know what this will work. It's possible that Spark will have an issue reading in a variable number of files per column partition.
Another way to do it would be write your own custom partitioner, but I have no idea what's involved in that so I can't supply any code.
The other answers here are very good but have some problems:
Relying on
maxRecordsPerFile
to break up large partitions into smaller files is very handy but comes with two caveats:If your partitioning columns are heavily skewed, repartitioning by them means potentially moving all the data for the largest data partition into a single DataFrame partition. If that DataFrame partition gets too large, that alone may crash your job.
To give a simple example, imagine what
repartition("country")
would do for a DataFrame that had 1 row for every person in the world.maxRecordsPerFile
will ensure that your output files don't exceed a certain number of rows, but only a single task will be able to write out these files serially. One task will have to work through the entire data partition, instead of being able to write out that large data partition with multiple tasks.
repartition(numPartitions, $"some_col", rand)
is an elegant solution but does not handle small data partitions well. It will write outnumPartitions
files for every data partition, even if they are tiny.This may not be a problem in many situations, but if you have a large data lake you know that writing out many small files will kill the performance of your data lake over time.
So one solution doesn't play well with very large data partitions, and the other doesn't play well with very small data partitions.
What we need is a way to dynamically scale the number of output files by the size of the data partition. If it's very large, we want many files. If it's very small, we want just a few files, or even just one file.
The solution is to extend the approach using repartition(..., rand)
and dynamically scale the range of rand
by the desired number of output files for that data partition.
Here's the essence of the solution I posted on a very similar question:
# In this example, `id` is a column in `skewed_data`.
partition_by_columns = ['id']
desired_rows_per_output_file = 10
partition_count = skewed_data.groupBy(partition_by_columns).count()
partition_balanced_data = (
skewed_data
.join(partition_count, on=partition_by_columns)
.withColumn(
'repartition_seed',
(
rand() * partition_count['count'] / desired_rows_per_output_file
).cast('int')
)
.repartition(*partition_by_columns, 'repartition_seed')
)
This will balance the size of the output files, regardless of partition skew, and without limiting your parallelism or generating too many small files for small partitions.
If you want to run this code yourself, I've provided a self-contained example, along with proof that the DataFrame partitions are being balanced correctly.
Let's expand on Raphael Roth's answer with an additional approach that'll create an upper bound on the number of files each partition can contain, as discussed in this answer:
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")