Scala and Spark UDF function
You have to define your function as a UDF.
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val convertUDF: UserDefinedFunction = udf((time:String) => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
})
Next you would apply your UDF on your DataFrame.
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing
Now, as to your actual problem, one reason you are receiving this error could be because your DataFrame contains rows which are nulls. If you filter them out before you apply the UDF, you should be able to continue no problem.
dataFrame.filter(col("time").isNotNull)
I'm curious what else causes a NullPointerException when running a UDF other than it encountering a null, if you found a reason different than my suggestion, I'd be glad to know.
Use udf instead of define a function directly
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
A udf's input parameter is Column(or Columns). And the return type is Column.
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}