Why accesing DataFrame from UDF results in NullPointerException?

You can not use a Dataframe inside of an udf. You will need to join productInformation and dict, and do the udf logic after the join.


Very interesting question. I have to do some search, and here is my though. Hope this will help you a little bit.

When you create Dataset via createDataset, spark will assign this dataset with LocalRelation logical query plan.

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
    val enc = encoderFor[T]
    val attributes = enc.schema.toAttributes
    val encoded = data.map(d => enc.toRow(d).copy())
    val plan = new LocalRelation(attributes, encoded)
    Dataset[T](self, plan)
  }

Follow this link: LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.

And, it's true as isLocal method point out

 /**
   * Returns true if the `collect` and `take` methods can be run locally
   * (without any Spark executors).
   *
   * @group basic
   * @since 1.6.0
   */
  def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]

Obviously, You can check out your 2 datasets is local.

And, the show method actually call take internally.

private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
    val numRows = _numRows.max(0)
    val takeResult = toDF().take(numRows + 1)
    val hasMoreData = takeResult.length > numRows
    val data = takeResult.take(numRows)

So, with those envidences, I think the call countDF.show is executed, it will behave simliar as when you call count on dict dataset from driver, number of call times is number of records of targets. And, the dict dataset of course doesn't need to be local for the show on countDF work.

You can try to save countDF, it will give you exception same as first case org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)