Creating parquet files in spark with row-group size that is less than 100
While PARQUET-409 is not yet fixed, there are couple of workarounds to make application work with that 100
hard-coded minimum number of records per a row group.
First issue and workaround:
You mentioned your rows are could be as large as 50Mb.
This gives row group size of approximately 5Gb.
At the same time your spark executors are only 4Gb (spark.executor.memory
).
Make it significantly bigger than max rowgroup size.
I recommend for such large spark executor memories of 12-20Gb for spark.executor.memory
. Play with this and see which one works for your datasets.
Most of our production jobs run with spark executor memory in this range.
For this to work for such large rowgroups, you may want to tune down also spark.executor.cores
to 1 to make sure each executor process only takes one such large rowgroup at a time. (at expense of loosing some Spark efficiencies) Perhaps try spark.executor.cores
set to 2 - this may require increasing spark.executor.memory
to 20-31Gb range. (try to stay under 32Gb as jvm switches to non-compressed OOP which may have as big as 50% overhead on memory)
Second issue and workaround: Such large rowchunks of 5Gb are most likely spread across many HDFS blocks as default HDFS blocks are in 128-256Mb range. (I assume you use HDFS for storage of those parquet files as you had "hadoop" tag) Parquet best practice is for a row group to reside completely in one HDFS block:
Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
Here's example how to change HDFS block size (set before you create such parquet files):
sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")
or in Spark Scala:
sc.hadoopConfiguration.set("dfs.block.size", "5g")
I hope this will be fixed at Parquet level sometimes, but these two workarounds should allow you to operate with Parquet wich such large row groups.
Unfortunately I haven't found a way to do so. I reported this issue to remove the hard coded values and make them configurable. I have a patch for it if you're interested.