How to speed up spark df.write jdbc to postgres database?
Spark side tuning => Perform repartition
on Datafarme so that there would multiple executor writing to DB in parallel
df
.repartition(10) // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
Postgresql side tuning => There will need to bump up below parameters on PostgreSQL respectively.
max_connections
determines the maximum number of concurrent connections to the database server. The default is typically 100 connections.shared_buffers
configuration parameter determines how much memory is dedicated to PostgreSQL to use for caching data.
To solve the performance issue, you generally need to resolve the below 2 bottlenecks:
- Make sure the spark job is writing the data in parallel to DB -
To resolve this make sure you have a partitioned dataframe. Use "
df.repartition(n)
" to partiton the dataframe so that each partition is written in DB parallely. Note - Large number of executors will also lead to slow inserts. So start with 5 partitions and increase the number of partitions by 5 till you get optimal performance. - Make sure the DB has enough compute, memory and storage required for ingesting bulk data.
By repartitioning the dataframe you can achieve a better write performance is a known answer. But there is an optimal way of repartitioning your dataframe. Since you are running this process on an EMR cluster , First get to know about the instance type and the number of cores that are running on each of your slave instances. According to that specify your number of partitions on a dataframe. In your case you are using m5.xlarge(2 slaves) which will have 4 vCPUs each which means 4 threads per instance. So 8 partitions will give you an optimal result when you are dealing with huge data.
Note : Number of partitions should be increased or decreased based on your data size.
Note : Batch size is also something you should consider in your writes. Bigger the batch size better the performance
Spark is a distributed data processing engine, so when you are processing your data or saving it on file system it uses all its executors to perform the task. Spark JDBC is slow because when you establish a JDBC connection, one of the executors establishes link to the target database hence resulting in slow speeds and failure.
To overcome this problem and speed up data writes to the database you need to use one of the following approaches:
Approach 1:
In this approach you need to use postgres COPY command utility in order to speed up the write operation. This requires you to have psycopg2 library on your EMR cluster.
The documentation for COPY utility is here
If you want to know the benchmark differences and why copy is faster visit here!
Postgres also suggests using COPY command for bulk inserts. Now how to bulk insert a spark dataframe. Now to implement faster writes, first save your spark dataframe to EMR file system in csv format and also repartition your output so that no file contains more than 100k rows.
#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)
Now read the files using python and execute copy command for each file.
import psycopg2
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')
#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()
To gain additional speed boost, since you are using EMR cluster you can leverage python multiprocessing to copy more than one file at once.
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))
This is the approach recommended as spark JDBC can't be tuned to gain higher write speeds due to connection constraints.
Approach 2: Since you are already using an AWS EMR cluster you can always leverage the hadoop capabilities to perform your table writes faster. So here we will be using sqoop export to export our data from emrfs to the postgres db.
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16
Why sqoop? Because sqoop opens multiple connections with the database based on the number of mapper specified. So if you specify -m as 8 then 8 concurrent connection streams will be there and those will write data to the postgres.
Also, for more information on using sqoop go through this AWS Blog, SQOOP Considerations and SQOOP Documentation.
If you can hack around your way with code then Approach 1 will definitely give you the performance boost you seek and if you are comfortable with hadoop components like SQOOP then go with second approach.
Hope it helps!