Why accesing DataFrame from UDF results in NullPointerException?
You can not use a Dataframe
inside of an udf
. You will need to join productInformation
and dict
, and do the udf
logic after the join.
Very interesting question. I have to do some search, and here is my though. Hope this will help you a little bit.
When you create Dataset
via createDataset, spark will assign this dataset with LocalRelation
logical query plan.
def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
val enc = encoderFor[T]
val attributes = enc.schema.toAttributes
val encoded = data.map(d => enc.toRow(d).copy())
val plan = new LocalRelation(attributes, encoded)
Dataset[T](self, plan)
}
Follow this link:
LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.
And, it's true as isLocal method point out
/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
*
* @group basic
* @since 1.6.0
*/
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
Obviously, You can check out your 2 datasets is local.
And, the show method actually call take
internally.
private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
val numRows = _numRows.max(0)
val takeResult = toDF().take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
So, with those envidences, I think the call countDF.show
is executed, it will behave simliar as when you call count
on dict
dataset from driver, number of call times is number of records of targets
. And, the dict
dataset of course doesn't need to be local for the show on countDF
work.
You can try to save countDF
, it will give you exception same as first case
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)