Does Spark know the partitioning key of a DataFrame?
I am answering my own question for future reference what worked.
Following suggestion of @user8371915, bucketBy works!
I am saving my DataFrame df
:
df.write
.bucketBy(250, "userid")
.saveAsTable("myNewTable")
Then when I need to load this table:
val df2 = spark.sql("SELECT * FROM myNewTable")
val w = Window.partitionBy("userid")
val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
I confirm that when I do window functions on df2
partitioned by userid
there is no shuffle! Thanks @user8371915!
Some things I learned while investigating it
- myNewTable looks like a normal parquet file but it is not. You could read it normally with
spark.read.format("parquet").load("path/to/myNewTable")
but theDataFrame
created this way will not keep the original partitioning! You must usespark.sql
select
to get correctly partitionedDataFrame
. - You can look inside the table with
spark.sql("describe formatted myNewTable").collect.foreach(println)
. This will tell you what columns were used for bucketing and how many buckets there are. - Window functions and joins that take advantage of partitioning often require also sort. You can sort data in your buckets at the write time using
.sortBy()
and the sort will be also preserved in the hive table.df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
- When working in local mode the table
myNewTable
is saved to aspark-warehouse
folder in my local Scala SBT project. When saving in cluster mode with mesos viaspark-submit
, it is saved to hive warehouse. For me it was located in/user/hive/warehouse
. - When doing
spark-submit
you need to add to yourSparkSession
two options:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083")
and.enableHiveSupport()
. Otherwise the hive tables you created will not be visible. - If you want to save your table to specific database, do
spark.sql("USE your database")
before bucketing.
Update 05-02-2018
I encountered some problems with spark bucketing and creation of Hive tables. Please refer to question, replies and comments in Why is Spark saveAsTable with bucketBy creating thousands of files?
Does Spark know that the dataframe df2 is partitioned by column numerocarte?
It does not.
If it does not know, how do I tell Spark the data is already partitioned by the right column?
You don't. Just because you save data which has been shuffled, it does not mean, that it will be loaded with the same splits.
How can I check a partitioning key of DataFrame?
There is no partitioning key once you loaded data, but you can check queryExecution
for Partitioner
.
In practice:
- If you want to support efficient pushdowns on the key, use
partitionBy
method ofDataFrameWriter
. - If you want a limited support for join optimizations use
bucketBy
with metastore and persistent tables.
See How to define partitioning of DataFrame? for detailed examples.