Spark Window Functions - rangeBetween dates
Spark >= 2.3
Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame
API support is still work in progress.
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, mean(some_value) OVER (
PARTITION BY id
ORDER BY CAST(start AS timestamp)
RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
) AS mean FROM df""").show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Spark < 2.3
As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY
clause used with RANGE
to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start
column contains date
type:
from pyspark.sql import Row
row = Row("id", "start", "some_value")
df = sc.parallelize([
row(1, "2015-01-01", 20.0),
row(1, "2015-01-06", 10.0),
row(1, "2015-01-07", 25.0),
row(1, "2015-01-12", 30.0),
row(2, "2015-01-01", 5.0),
row(2, "2015-01-03", 30.0),
row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))
A small helper and window definition:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400
Finally query:
w = (Window()
.partitionBy(col("id"))
.orderBy(col("start").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0))
df.select(col("*"), mean("some_value").over(w).alias("mean")).show()
## +---+----------+----------+------------------+
## | id| start|some_value| mean|
## +---+----------+----------+------------------+
## | 1|2015-01-01| 20.0| 20.0|
## | 1|2015-01-06| 10.0| 15.0|
## | 1|2015-01-07| 25.0|18.333333333333332|
## | 1|2015-01-12| 30.0|21.666666666666668|
## | 2|2015-01-01| 5.0| 5.0|
## | 2|2015-01-03| 30.0| 17.5|
## | 2|2015-02-01| 20.0| 20.0|
## +---+----------+----------+------------------+
Far from pretty but works.
* Hive Language Manual, Types
Fantastic solution @zero323, if you want to operate with minutes instead of days as I have to, and you don't need to partition with id, so you only have to modify a simply part of the code as I show:
df.createOrReplaceTempView("df")
spark.sql(
"""SELECT *, sum(total) OVER (
ORDER BY CAST(reading_date AS timestamp)
RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
) AS sum_total FROM df""").show()
Spark 3.3 is released, but...
The answer may be as old as Spark 1.5.0:
datediff
.
datediff(col_name, '1000')
will return an integer difference of days from 1000-01-01 to col_name.
As the first argument, it accepts dates, timestamps and even strings.
As the second, it even accepts 1000
.
The answer
Date difference in days - depending on the data type of the order column:
date
Spark 3.1+
.orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
Spark 2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
timestamp
Spark 2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
long - UNIX time in microseconds (e.g. 1672534861000000)
Spark 2.1+
.orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
long - UNIX time in milliseconds (e.g. 1672534861000)
Spark 2.1+
.orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
long - UNIX time in seconds (e.g. 1672534861)
Spark 2.1+
.orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
long in format yyyyMMdd
Spark 3.3+
.orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
Spark 3.1+
.orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
Spark 2.2+
.orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
Spark 2.1+
.orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
string in date format of 'yyyy-MM-dd'
Spark 3.1+
.orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
Spark 2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
string in other date format (e.g. 'MM-dd-yyyy')
Spark 3.1+
.orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
Spark 2.2+
.orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
Spark 2.1+
.orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
string in timestamp format of 'yyyy-MM-dd HH:mm:ss'
Spark 2.1+
.orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
string in other timestamp format (e.g. 'MM-dd-yyyy HH:mm:ss')
Spark 2.2+
.orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)