How to define partitioning of DataFrame?

Spark >= 2.3.0

SPARK-22614 exposes range partitioning.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 exposes external format partitioning in the Data Source API v2.

Spark >= 1.6.0

In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Unlike RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.

Spark < 1.6.0:

One thing you can do is to pre-partition input data before you create a DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Since DataFrame creation from an RDD requires only a simple map phase existing partition layout should be preserved*:

assert(df.rdd.partitions == partitioned.partitions)

The same way you can repartition existing DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:

  1. Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

    • joins in some scenarios, but it would require an internal support,
    • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
    • simple aggregations with GROUP BY - it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
    • data compression with SqlContext.cacheTable. Since it looks like it is using run length encoding, applying OrderedRDDFunctions.repartitionAndSortWithinPartitions could improve compression ratio.
  2. Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

  3. A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. As already mentioned by @dwysakowicz and @RomiKuntsman an optimization is a job of the Catalyst Optimizer. It is a pretty sophisticated beast and I really doubt you can easily improve on that without diving much deeper into its internals.

Related concepts

Partitioning with JDBC sources:

JDBC data sources support predicates argument. It can be used as follows:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.

partitionBy method in DataFrameWriter:

Spark DataFrameWriter provides partitionBy method which can be used to "partition" data on write. It separates data on write using provided set of columns

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

This enables predicate push down on read for queries based on key:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

but it is not equivalent to DataFrame.repartition. In particular aggregations like:

val cnts = df1.groupBy($"k").sum()

will still require TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy method in DataFrameWriter (Spark >= 2.0):

bucketBy has similar applications as partitionBy but it is available only for tables (saveAsTable). Bucketing information can used to optimize joins:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* By partition layout I mean only a data distribution. partitioned RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.


In Spark < 1.6 If you create a HiveContext, not the plain old SqlContext you can use the HiveQL DISTRIBUTE BY colX... (ensures each of N reducers gets non-overlapping ranges of x) & CLUSTER BY colX... (shortcut for Distribute By and Sort By) for example;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Not sure how this fits in with Spark DF api. These keywords aren't supported in the normal SqlContext (note you dont need to have a hive meta store to use the HiveContext)

EDIT: Spark 1.6+ now has this in the native DataFrame API