How do I get a SQL row_number equivalent for a Spark RDD?
The row_number() over (partition by ... order by ...)
functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.
Create a test DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Add the partitioned row number:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.
Here is how I would tackle it:
1- Simplify your data:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 is now a "real" key-value pair. It looks like that:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Then, use the group-by function to reproduce the effect of the PARTITION BY:
temp3 = temp2.groupByKey()
temp3 is now a RDD with 2 rows:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
At the end (without the key argument function, it looks like that):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Hope that helps!
Good luck.