What row is used in dropDuplicates operator?
Unless you also use coalesce(1)
before dropDuplicates, you may end up with an unexpected order before removing duplicates. See here for a thorough documentation of examples: https://stackoverflow.com/a/54738843/4166885
I was wondering why I sometimes ended up with a dataframe that removed the "wrong" rows. coalesce(1)
solved the problem.
edit: since sometimes coalesce(1)
isn't an option, my favoured solution is this one from the post above:
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("col1").orderBy("datestr",'tiebreak')
(df_s
.withColumn('tiebreak', monotonically_increasing_id())
.withColumn('rank', rank().over(window))
.filter(col('rank') == 1).drop('rank','tiebreak')
.show()
)
TL;DR Keep First (according to row order)
dropDuplicates
operator in Spark SQL creates a logical plan with Deduplicate
operator.
That Deduplicate
operator is translated to First
logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)
You can see the Deduplicate
operator in the logical plan below.
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
The following is the logical plan of q
dataset.
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Deduplicate
operator is then translated to First
logical operator (that shows itself as Aggregate
operator after optimizations).
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
After spending some time reviewing the code of Apache Spark, dropDuplicates
operator is equivalent to groupBy
followed by first function.
first(columnName: String, ignoreNulls: Boolean): Column Aggregate function: returns the first value of a column in a group.
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
I also think that dropDuplicates
operator may be more performant.