Spark add new column to dataframe with value from previous row
You can use lag
window function as follows
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
but there some important issues:
- if you need a global operation (not partitioned by some other column / columns) it is extremely inefficient.
- you need a natural way to order your data.
While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrame
to RDD and compute lag
manually. See for example:
- How to transform data with sliding window over time series data in Pyspark
- Apache Spark Moving Average (written in Scala, but can be adjusted for PySpark. Be sure to read the comments first).
Other useful links:
- https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
- Spark Window Functions - rangeBetween dates