PySpark timeout trying to repartition/write to parquet (Futures timed out after [300 seconds])?
The weakest point of your code is the following:
LIMIT 5000000
if you take a careful look a the execution plan
: +- *GlobalLimit 5000000
: +- Exchange SinglePartition
: +- *LocalLimit 5000000
you'll see that the implementation uses two-step process, where partial limits are collected to a single partition. Which such larger number (LIMIT
is simply not designed with such scenario in mind) you can easily overwhelm corresponding executor.
Additionally LIMIT
in your code is redundant, since you follow it by .sample(False, 0.001)
.
I'd recommend dropping the LIMIT
clause, and adjusting fraction accordingly:
result_full = spark.sql("""
SELECT
f.*,
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
countryName, cityName, airportName, a.name AS agentName
FROM flights f
INNER JOIN agents a
ON f.agent = a.id
INNER JOIN airports p
ON f.querydestinationplace = p.airportId
""")
desired_size = (5000000 * 0.001)
fraction = desired_size / result_full .count()
assert 1 < fraction < 0
result_sample = result_full.sample(False, fraction)
Additionally I'd recommend rewriting generate_date_series
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
def generate_date_series(start, stop):
span = (stop - start).days + 1
return (SparkSession.builder.getOrCreate()
.range(0, span)
.withColumn("start", lit(start))
.selectExpr("date_add(start, id) AS querydatetime"))
(generate_date_series(start, seven_days_ago)
.createOrReplaceTempView("relaventDates"))
Finally I'd strongly recommend replacing getInterval
UDF with composition of built-in functions* (unused arguments preserved as-is):
from pyspark.sql.functions import concat, floor
from pyspark.sql.functions import Column
def get_interval(num, start, stop, incr):
assert isinstance(num, Column)
lower = floor(num / incr).cast("integer") * incr
upper = lower + incr
return concat(lit("("), lower, lit(","), upper, lit(")"))
which could be later used as direct replacement of UDF, though it is unlikely to contribute directly to your current problems.
from pyspark.sql.functions import hour
...
.withColumn(
"out_departure_interval",
get_interval(hour("outdeparture"), 0, 24, 4))
On a side note UDFRegistration.register
returns callable object for a couple of releases now, so you might be able to replace
spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)
with
getIntervalUdf = spark.udf.register("getInterval", getInterval, StringType())
* You can also consider bucketing using dedicated window
function:
Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).