How to optimize partitioning when migrating data from JDBC source?
In my experience there are 4 kinds of memory settings which make a difference:
A) [1] Memory for storing data for processing reasons VS [2] Heap Space for holding the program stack
B) [1] Driver VS [2] executor memory
Up to now, I was always able to get my Spark jobs running successfully by increasing the appropriate kind of memory:
A2-B1 would therefor be the memory available on the driver to hold the program stack. Etc.
The property names are as follows:
A1-B1) executor-memory
A1-B2) driver-memory
A2-B1) spark.yarn.executor.memoryOverhead
A2-B2) spark.yarn.driver.memoryOverhead
Keep in mind that the sum of all *-B1 must be less than the available memory on your workers and the sum of all *-B2 must be less than the memory on your driver node.
My bet would be, that the culprit is one of the boldly marked heap settings.
Determine how many partitions you need given the amount of input data and your cluster resources. As a rule of thumb it is better to keep partition input under 1GB unless strictly necessary. and strictly smaller than the block size limit.
You've previously stated that you migrate 1TB of data values you use in different posts (5 - 70) are likely way to low to ensure smooth process.
Try to use value which won't require further
repartitioning
.Know your data.
Analyze the columns available in the the dataset to determine if there any columns with high cardinality and uniform distribution to be distributed among desired number of partitions. These are good candidates for an import process. Additionally you should determine an exact range of values.
Aggregations with different centrality and skewness measure as well as histograms and basic counts-by-key are good exploration tools. For this part it is better to analyze data directly in the database, instead of fetching it to Spark.
Depending on the RDBMS you might be able to use
width_bucket
(PostgreSQL, Oracle) or equivalent function to get a decent idea how data will be distributed in Spark after loading withpartitionColumn
,lowerBound
,upperBound
,numPartitons
.s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*) FROM t GROUP BY bucket) as tmp)"""
If there are no columns which satisfy above criteria consider:
- Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (
DBMS_CRYPTO
in Oracle,pgcrypto
in PostgreSQL)*. Using a set of independent columns which taken together provide high enough cardinality.
Optionally, if you're going to write to a partitioned Hive table, you should consider including Hive partitioning columns. It might limit the number of files generated later.
- Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (
Prepare partitioning arguments
If column selected or created in the previous steps is numeric (or date / timestamp in Spark >= 2.4) provide it directly as the
partitionColumn
and use range values determined before to filllowerBound
andupperBound
.If bound values don't reflect the properties of data (
min(col)
forlowerBound
,max(col)
forupperBound
) it can result in a significant data skew so thread carefully. In the worst case scenario, when bounds don't cover the range of data, all records will be fetched by a single machine, making it no better than no partitioning at all.If column selected in the previous steps is categorical or is a set of columns generate a list of mutually exclusive predicates that fully cover the data, in a form that can be used in a
SQL
where clause.For example if you have a column
A
with values {a1
,a2
,a3
} and columnB
with values {b1
,b2
,b3
}:val predicates = for { a <- Seq("a1", "a2", "a3") b <- Seq("b1", "b2", "b3") } yield s"A = $a AND B = $b"
Double check that conditions don't overlap and all combinations are covered. If these conditions are not satisfied you end up with duplicates or missing records respectively.
Pass data as
predicates
argument tojdbc
call. Note that the number of partitions will be equal exactly to the number of predicates.
Put database in a read-only mode (any ongoing writes can cause data inconsistency. If possible you should lock database before you start the whole process, but if might be not possible, in your organization).
If the number of partitions matches the desired output load data without
repartition
and dump directly to the sink, if not you can try to repartition following the same rules as in the step 1.If you still experience any problems make sure that you've properly configured Spark memory and GC options.
If none of the above works:
Consider dumping your data to a network / distributes storage using tools like
COPY TO
and read it directly from there.Note that or standard database utilities you will typically need a POSIX compliant file system, so HDFS usually won't do.
The advantage of this approach is that you don't need to worry about the column properties, and there is no need for putting data in a read-only mode, to ensure consistency.
Using dedicated bulk transfer tools, like Apache Sqoop, and reshaping data afterwards.
* Don't use pseudocolumns - Pseudocolumn in Spark JDBC.