Multiple spark jobs appending parquet data to same base path with partitioning
So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.
I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...
Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.
I've ruled out using the DirectOutputCommitter for a number of reasons.
- When used in conjunction with speculation mode it results in duplication (https://issues.apache.org/jira/browse/SPARK-9899)
- Task failures will leave clutter which would be impossible to find and remove/clean later.
- Spark 2.0 has removed support for this completely and no upgrade path exists.(https://issues.apache.org/jira/browse/SPARK-10063)
The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.
This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.
Instead of using partitionBy
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Alternatively you can write the files as
In job-1 specify the parquet file path as :
dataFrame.write().mode(Append)
.parquet("s3://bucket/save/path/eventDate=20160101/channel=billing_events")
& in job-2 specify the parquet file path as :
dataFrame.write().mode(Append)
.parquet("s3://bucket/save/path/eventDate=20160101/channel=click_events")
- Both jobs will create seperate _temporary directory under the respective folder so concurrency issue is solved.
- And partition discovery will also happen as eventDate=20160101 and for channel column.
- Disadvantage - even if channel=click_events do not exists in data still parquet file for the channel=click_events will be created.