partitionBy & overwrite strategy in an Azure DataLake using PySpark in Databricks
I saw that you are using databricks in the azure stack. I think the most viable and recommended method for you to use would be to make use of the new delta lake project in databricks:
It provides options for various upserts, merges and acid transactions to object stores like s3 or azure data lake storage. It basically provides the management, safety, isolation and upserts/merges provided by data warehouses to datalakes. For one pipeline apple actually replaced its data warehouses to be run solely on delta databricks because of its functionality and flexibility. For your use case and many others who use parquet, it is just a simple change of replacing 'parquet' with 'delta', in order to use its functionality (if you have databricks). Delta is basically a natural evolution of parquet and databricks has done a great job by providing added functionality and as well as open sourcing it.
For your case, I would suggest you try the replaceWhere option provided in delta. Before making this targeted update, the target table has to be of format delta
Instead of this:
dataset.repartition(1).write.mode('overwrite')\
.partitionBy('Year','Week').parquet('\curataed\dataset')
From https://docs.databricks.com/delta/delta-batch.html:
'You can selectively overwrite only the data that matches predicates over partition columns'
You could try this:
dataset.write.repartition(1)\
.format("delta")\
.mode("overwrite")\
.partitionBy('Year','Week')\
.option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
.save("\curataed\dataset")
Also, if you wish to bring partitions to 1, why dont you use coalesce(1) as it will avoid a full shuffle.
From https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:
'replaceWhere is particularly useful when you have to run a computationally expensive algorithm, but only on certain partitions'
Therefore, I personally think that using replacewhere to manually specify your overwrite will be more targeted and computationally efficient then to just rely on:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Databricks provides optimizations on delta tables make it a faster, and much more efficient option to parquet( hence a natural evolution) by bin packing and z-ordering:
From Link:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
- WHERE(binpacking)
'Optimize the subset of rows matching the given partition predicate. Only filters involving partition key attributes are supported.'
- ZORDER BY
'Colocate column information in the same set of files. Co-locality is used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read'.
Faster query execution with indexing, statistics, and auto-caching support
Data reliability with rich schema validation and transactional guarantees
Simplified data pipeline with flexible UPSERT support and unified Structured Streaming + batch processing on a single data source
You could also check out the complete documentation of the open source project: https://docs.delta.io/latest/index.html
.. I also want to say that I do not work for databricks/delta lake. I have just seen their improvements and functionality benefit me in my work.
UPDATE:
The gist of the question is "replacing data that exists and creating new folders for new data" and to do it in highly scalable and effective manner.
Using dynamic partition overwrite in parquet does the job however I feel like the natural evolution to that method is to use delta table merge operations which were basically created to 'integrate data from Spark DataFrames into the Delta Lake'. They provide you with extra functionality and optimizations in merging your data based on how would want that to happen and keep a log of all actions on a table so you can rollback versions if needed.
Delta lake python api(for merge): https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder
databricks optimization: https://kb.databricks.com/delta/delta-merge-into.html#discussion
Using a single merge operation you can specify the condition merge on, in this case it could be a combination of the year and week and id, and then if the records match(meaning they exist in your spark dataframe and delta table, week1 and week2), update them with the data in your spark dataframe and leave other records unchanged:
#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)
For some cases, if nothing matches then you might want to insert and create new rows and partitions, for that you can use:
.whenNotMatchedInsertAll(condition=None)
You can use .converttodelta operation https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta, to convert your parquet table to a delta table so that you can perform delta operations on it using the api.
'You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible'
Your merge case(replacing data where it exists and creating new records when it does not exist) could go like this:
(have not tested, refer to examples + api for syntax)
%python
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")
deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year AND target.Week = dataset.Week") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
If the delta table is partitioned correctly(year,week) and you used whenmatched clause correctly, these operations will be highly optimized and could take seconds in your case. It also provides you with consistency, atomicity and data integrity with option to rollback.
Some more functionality provided is that you can specify the set of columns to update if the match is made, (if you only need to update certain columns). You can also enable spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
, so that delta uses minimal targeted partitions to carry out the merge(update,delete,create).
Overall, I think using this approach is a very new and innovative way of carrying out targeted updates as it gives you more control over it while keeping ops highly efficient. Using parquet with dynamic partitionoverwrite mode will also work fine however, delta lake features bring data quality to your data lake that is unmatched.
My recommendation:
I would say for now, use dynamic partition overwrite mode for parquet files to do your updates, and you could experiment and try to use the delta merge on just one table with the databricks optimization of spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
and .whenMatchedUpdateAll()
and compare the performance of both(your files are small so I do not think it will be a big difference). The databricks partition pruning optimization for merges article came out in Feb so it is really new and possibly could be a gamechanger for the overhead delta merge operations incur( as under the hood they just create new files, but partition pruning could speed it up)
Merge examples in python,scala,sql: https://docs.databricks.com/delta/delta-update.html#merge-examples
https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html