How to add multiple columns using UDF?
In scala
import spark.implicits
val df = Seq(("Alive", 4)).toDF("Name", "Number")
Without a UDF
df.
withColumn("OutPlus", $"Number" + 2).
withColumn("OutMinus", $"Number" - 2).
show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive| 4| 6| 2|
+-----+------+-------+--------+
With a UDF using explode
import org.apache.spark.sql.functions.udf
def twoItems(_i: Int) = Seq((_i + 2, _i - 2))
val twoItemsUdf = udf(twoItems(_: Int))
val exploded = df.
withColumn("Out", explode(twoItemsUdf($"Number"))).
withColumn("OutPlus", $"Out._1").
withColumn("OutMinus", $"Out._2")
exploded.printSchema
root
|-- Name: string (nullable = true)
|-- Number: integer (nullable = false)
|-- Out: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: integer (nullable = false)
|-- OutPlus: integer (nullable = true)
|-- OutMinus: integer (nullable = true)
exploded.drop("Out").show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive| 4| 6| 2|
+-----+------+-------+--------+
To return a StructType
, just using Row
from pyspark.sql.types import StructType,StructField,IntegerType,Row
from pyspark.sql import functions as F
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])
def example(n):
return Row('Out1', 'Out2')(n + 2, n - 2)
schema = StructType([
StructField("Out1", IntegerType(), False),
StructField("Out2", IntegerType(), False)])
example_udf = F.UserDefinedFunction(example, schema)
newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)
Better way to solve above problem is by casting the output in an array and then exploding it
import pyspark.sql.functions as f
import pyspark.sql.types as t
df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])
def example(n):
return t.Row('Out1', 'Out2')(n + 2, n - 2)
schema = StructType([
StructField("Out1", t.IntegerType(), False),
StructField("Out2", t.IntegerType(), False)])
example_udf = f.udf(example, schema)
newDF = df.withColumn("Output", f.explode(f.array(example_udf(df["Number"]))))
newDF = newDF.select("Name", "Number", "Output.*")
newDF.show(truncate=False)
newDF.explain()
Notice the output of explain, you will observe that example method is actually getting called only once!!