java.lang.UnsupportedOperationExceptionfieldIndex on a Row without schema is undefined: Exception on row.getAs[String]

The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):

groupByKey(row => row.getAs[String]("_1"))

with the field-position version:

groupByKey(row => row.getAs[String](fieldIndexMap("_1")))

where fieldIndexMap maps field names to their corresponding field indexes:

val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap

As a side note, your function for flatMapGroups can be simplified into something like below:

val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
  flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
  })(expr1)

The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].


Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436

This issue is caused by how you create the row:

listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))

Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.

Changing the line to create GenericRowWithSchema can solve it:

listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)