Spark DataFrame: does groupBy after orderBy maintain that order?
order may or may not be the same, depending on number of partitions and the distribution of data. We can solve using rdd itself.
For example::
I saved the below sample data in a file and loaded it in hdfs.
1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800
and executed the below command:
sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
output:
Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
That is, we grouped the data by type, thereafter sorted by price, and the concatenated the ids with "~" as separator. The above command can be broken as below:
val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
val groupedData=validData.groupBy(_(1)) //group data rdds
val sortedJoinedData=groupedData.mapValues(x=>{
val list=x.toList
val sortedList=list.sortBy(_(2))
val idOnlyList=sortedList.map(_(0))
idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()
we can then take a particular group by using the command
sortedJoinedData.filter(_._1=="type1").collect()
output:
Array[(String, String)] = Array((type1,2~1~5))
If you want to work around the implementation in Java (Scala and Python should be similar):
example.orderBy("hour")
.groupBy("id")
.agg(functions.sort_array(
functions.collect_list(
functions.struct(dataRow.col("hour"),
dataRow.col("count"))),false)
.as("hourly_count"));
I have a case where the order is not always kept: sometimes yes, mostly no.
My dataframe has 200 partitions running on Spark 1.6
df_group_sort = data.orderBy(times).groupBy(group_key).agg(
F.sort_array(F.collect_list(times)),
F.collect_list(times)
)
to check the ordering I compare the return values of
F.sort_array(F.collect_list(times))
and
F.collect_list(times)
giving e.g. (left: sort_array(collect_list()); right: collect_list())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
The left column is always sorted, while the right column only consists of sorted blocks. For different executions of take(), the order of the blocks in the right column is different.
groupBy
after orderBy
doesn't maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list
over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).
Complete example code:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(
( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)
).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn(
"collected",
collect_list($"count").over(
Window.partitionBy("id").orderBy("hour")
)
)
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count")
.show
This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.
Output:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+