Spark SQL and MySQL- SaveMode.Overwrite not inserting modified data
The problem is in your code. Because you overwrite a table from which you're trying to read you effectively obliterate all data before Spark can actually access it.
Remember that Spark is lazy. When you create a Dataset
Spark fetches required metadata, but doesn't load the data. So there is no magic cache which will preserve original content. Data will be loaded when it is actually required. Here it is when you execute write
action and when you start writing there is no more data to be fetched.
What you need is something like this:
- Create a
Dataset
. Apply required transformations and write data to an intermediate MySQL table.
TRUNCATE
the original input andINSERT INTO ... SELECT
from the intermediate table orDROP
the original table andRENAME
intermediate table.
Alternative, but less favorable approach, would be:
- Create a
Dataset
. - Apply required transformations and write data to a persistent Spark table (
df.write.saveAsTable(...)
or equivalent) TRUNCATE
the original input.- Read data back and save (
spark.table(...).write.jdbc(...)
) - Drop Spark table.
We cannot stress enough that using Spark cache
/ persist
is not the way to go. Even in with the conservative StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) cached data can be lost (node failures), leading to silent correctness errors.
I believe all the steps above are unnecessary. Here's what you need to do:
Create a dataset
A
likeval A = spark.read.parquet("....")
Read the table to be updated, as dataframe
B
. Make sure enable caching is enabled for dataframeB
.val B = spark.read.jdbc("mytable").cache
Force a
count
onB
- this will force execution and cache the table depending on the chosenStorageLevel
-B.count
Now, you can do a transformation like
val C = A.union(B)
And, then write
C
back to the database likeC.write.mode(SaveMode.Overwrite).jdbc("mytable")