grouping consecutive rows in PySpark Dataframe

Here's one approach:

Gather together rows into groups where a group is a set of rows with the same user_id that are consecutive (start_time matches previous end_time). Then you can use this group to do your aggregation.

A way to get here is by creating intermediate indicator columns to tell you if the user has changed or the time is not consecutive. Then perform a cumulative sum over the indicator column to create the group.

For example:

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.orderBy("start_time")
df = df.withColumn(
        (f.col("user_id") != f.lag("user_id").over(w1)).cast("int")
        (f.col("start_time") != f.lag("end_time").over(w1)).cast("int")
        subset=["userChange", "timeChange"]
        (~((f.col("userChange") == 0) & (f.col("timeChange")==0))).cast("int")
        f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0))
#|      1|  19:00:00|19:30:00|      30|         0|         0|        0|    0|
#|      1|  19:30:00|19:40:00|      10|         0|         0|        0|    0|
#|      1|  19:40:00|19:43:00|       3|         0|         0|        0|    0|
#|      2|  20:00:00|20:10:00|      10|         1|         1|        1|    1|
#|      1|  20:05:00|20:15:00|      10|         1|         1|        1|    2|
#|      1|  20:15:00|20:35:00|      20|         0|         0|        0|    2|

Now that we have the group column, we can aggregate as follows to get the desired result:

df.groupBy("user_id", "group")\
#|      1|  19:00:00|19:43:00|      43|
#|      1|  20:05:00|20:35:00|      30|
#|      2|  20:00:00|20:10:00|      10|


