CodeGen grows beyond 64 KB error when normalizing large PySpark dataframe

One obvious problem is the way you use window functions. The following frame:

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    

is a bit useless in practice. Without partition column it reshuffles all data to a single partition first. This method of scaling is useful only to perform scaling in groups.

Spark provides two classes which can be used to scale features:

  • pyspark.ml.feature.StandardScaler
  • pyspark.mllib.feature.StandardScaler

Unfortunately both require Vector data as an input. With ML

from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline

scaled = Pipeline(stages=[
    VectorAssembler(inputCols=df.columns, outputCol="features"), 
    MLScaler(withMean=True, inputCol="features", outputCol="scaled")
]).fit(df).transform(df).select("scaled")

This require further expanding of the scaled column if you need the original shape.

With MLlib:

from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector

rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)

scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)

The latter method can be more useful if there is a codegen issues related to the number of columns.

Another way you can approach this problem to compute global statistics

from pyspark.sql.functions import avg, col, stddev_pop, struct

stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()

and select:

df.select(*[
    ((col(c) - mean) / std).alias(c)
    for (c, (mean, std)) in zip(df.columns, stats)
])

Following your comments the simplest solution you can think can be expressed using NumPy and a few basic transformations:

rdd = df.rdd.map(np.array)  # Convert to RDD of NumPy vectors
stats = rdd.stats()  # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev())  # Normalize

and converted back to DataFrame:

scaled.map(lambda x: x.tolist()).toDF(df.columns)