Dropping nested column of Dataframe with PySpark
Example for pyspark:
def drop_col(df, struct_nm, delete_struct_child_col_nm):
fields_to_keep = filter(lambda x: x != delete_struct_child_col_nm, df.select("{}.*".format(struct_nm)).columns)
fields_to_keep = list(map(lambda x: "{}.{}".format(struct_nm, x), fields_to_keep))
return df.withColumn(struct_nm, struct(fields_to_keep))
A method that I found using pyspark is by first converting the nested column into json and then parse the converted json with a new nested schema with the unwanted columns filtered out.
Suppose I have the following schema and I want to drop d
, e
and j
(a.b.d
, a.e
, a.h.j
) from the dataframe:
root
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- c: long (nullable = true)
| | |-- d: string (nullable = true)
| |-- e: struct (nullable = true)
| | |-- f: long (nullable = true)
| | |-- g: string (nullable = true)
| |-- h: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- i: string (nullable = true)
| | | |-- j: string (nullable = true)
|-- k: string (nullable = true)
I used the following approach:
Create new schema for
a
by excludingd
,e
andj
. A quick way to do this is by manually select the fields that you want fromdf.select("a").schema
and create a new schema from the selected fields usingStructType
. Or, you can do this programmatically by traversing the schema tree and exclude the unwanted fields, something like:def exclude_nested_field(schema, unwanted_fields, parent=""): new_schema = [] for field in schema: full_field_name = field.name if parent: full_field_name = parent + "." + full_field_name if full_field_name not in unwanted_fields: if isinstance(field.dataType, StructType): inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name) new_schema.append(StructField(field.name, inner_schema)) elif isinstance(field.dataType, ArrayType): inner_schema = exclude_nested_field(field.dataType.elementType, unwanted_fields, full_field_name) new_schema.append(StructField(field.name, ArrayType(inner_schema))) else: new_schema.append(StructField(field.name, field.dataType)) return StructType(new_schema) new_schema = exclude_nested_field(df.schema["a"].dataType, ["b.d", "e", "h.j"])
Convert
a
column to json:.withColumn("json", F.to_json("a")).drop("a")
- Parse the json-converted
a
column from step 2 with the new schema found in step 1:.withColumn("a", F.from_json("json", new_schema)).drop("json")