PySpark: filtering a DataFrame by date field in range where date is string
I figured out a way to solve my problem by using the SparkSQL API with dates in String format.
Here is an example:
last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')
new_df = df.where(df.date >= last_week)
from datetime import datetime, timedelta
last_7_days = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')
new_df = signal1.where(signal1.publication_date >= last_7_days)
You need to import datetime and timedelta for this.
Spark >= 1.5
You can use INTERVAL
from pyspark.sql.functions import expr, current_date
df_casted.where(col("dt") >= current_date() - expr("INTERVAL 7 days"))
Spark < 1.5
You can solve this without using worker side Python code and switching to RDDs. First of all, since you use ISO 8601 string, your data can be directly casted to date or timestamp:
from pyspark.sql.functions import col
df = sc.parallelize([
('2015-07-02T11:22:21.050Z', ),
('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))
df_casted = df.select("*",
col("d_str").cast("date").alias("dt"),
col("d_str").cast("timestamp").alias("ts"))
This will save one roundtrip between JVM and Python. There are also a few way you can approach the second part. Date only:
from pyspark.sql.functions import current_date, datediff, unix_timestamp
df_casted.where(datediff(current_date(), col("dt")) < 7)
Timestamps:
def days(i: int) -> int:
return 60 * 60 * 24 * i
df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))
You can also take a look at current_timestamp
and date_sub
Note: I would avoid using DataFrame.map
. It is better to use DataFrame.rdd.map
instead. It will save you some work when switching to 2.0+