DataFrame-ified zipWithIndex

Since Spark 1.6 there is a function called monotonically_increasing_id()
It generates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it.
Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray
        
     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}

This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores
After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower
You may want or not want to use dfWithPartitionId.cache(), depends on task


The following was posted on behalf of the David Griffin (edited out of question).

The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
        ++ df.schema.fields ++ 
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  ) 
}

Starting in Spark 1.5, Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm. But I am posting it because:

  1. Someone else is going to be tempted to try this
  2. Maybe someone can optimize the expressions below

At any rate, here's what works for me:

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note that I use lit(1) for both the partitioning and the ordering -- this makes everything be in the same partition, and seems to preserve the original ordering of the DataFrame, but I suppose it is what slows it way down.

I tested it on a 4-column DataFrame with 7,000,000 rows and the speed difference is significant between this and the above dfZipWithIndex (like I said, the RDD functions is much, much faster).