Spark SQL window function with complex condition
Refactoring the other answer to work with Pyspark
In Pyspark
you can do like below.
create data frame
df = sqlContext.createDataFrame(
[
("SirChillingtonIV", "2012-01-04"),
("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"),
("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"),
("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
],
("user_name", "login_date"))
The above code creates a data frame like below
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
|SirChillingtonIV|2012-01-14|
|SirChillingtonIV|2012-08-11|
+----------------+----------+
Now we want to first find out the difference between login_date
is more than 5
days.
For this do like below.
Necessary imports
from pyspark.sql import functions as f
from pyspark.sql import Window
# defining window partitions
login_window = Window.partitionBy("user_name").orderBy("login_date")
session_window = Window.partitionBy("user_name", "session")
session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
When we run the above line of code if the date_diff
is NULL
then the coalesce
function will replace NULL
to 0
.
+----------------+----------+-------+
| user_name|login_date|session|
+----------------+----------+-------+
| OprahWinfreyJr|2012-01-10| 0|
|SirChillingtonIV|2012-01-04| 0|
|SirChillingtonIV|2012-01-11| 1|
|SirChillingtonIV|2012-01-14| 1|
|SirChillingtonIV|2012-08-11| 2|
|Booooooo99900098|2012-01-04| 0|
|Booooooo99900098|2012-01-06| 0|
+----------------+----------+-------+
# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04|
|SirChillingtonIV|2012-01-11| 2012-01-11|
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+
Spark >= 3.2
Recent Spark releases provide native support for session windows in both batch and structured streaming queries (see SPARK-10816 and its sub-tasks, especially SPARK-34893).
The official documentation provides nice usage example.
Spark < 3.2
Here is the trick. Import a bunch of functions:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
Define windows:
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
Find the points where new sessions starts:
val newSession = (coalesce(
datediff($"login_date", lag($"login_date", 1).over(userWindow)),
lit(0)
) > 5).cast("bigint")
val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
Find the earliest date per session:
val result = sessionized
.withColumn("became_active", min($"login_date").over(userSessionWindow))
.drop("session")
With dataset defined as:
val df = Seq(
("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")
The result is:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14| 2012-01-11|
|SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
+----------------+----------+-------------+