DataFrame-ified zipWithIndex
Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())
val partitionOffsets = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
.orderBy("partition_id")
.select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
.collect()
.map(_.getLong(0))
.toArray
dfWithPartitionId
.withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id", "partition_offset", "inc_id")
}
This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world:
200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache()
, depends on task
The following was posted on behalf of the David Griffin (edited out of question).
The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}
Starting in Spark 1.5, Window
expressions were added to Spark. Instead of having to convert the DataFrame
to an RDD
, you can now use org.apache.spark.sql.expressions.row_number
. Note that I found performance for the the above dfZipWithIndex
to be significantly faster than the below algorithm. But I am posting it because:
- Someone else is going to be tempted to try this
- Maybe someone can optimize the expressions below
At any rate, here's what works for me:
import org.apache.spark.sql.expressions._
df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
Note that I use lit(1)
for both the partitioning and the ordering -- this makes everything be in the same partition, and seems to preserve the original ordering of the DataFrame
, but I suppose it is what slows it way down.
I tested it on a 4-column DataFrame
with 7,000,000 rows and the speed difference is significant between this and the above dfZipWithIndex
(like I said, the RDD
functions is much, much faster).