java.lang.UnsupportedOperationExceptionfieldIndex on a Row without schema is undefined: Exception on row.getAs[String]
The reported problem could be avoided by replacing the fieldname
version of getAs[T] method (used in the function for groupByKey
):
groupByKey(row => row.getAs[String]("_1"))
with the field-position
version:
groupByKey(row => row.getAs[String](fieldIndexMap("_1")))
where fieldIndexMap
maps field names to their corresponding field indexes:
val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap
As a side note, your function for flatMapGroups can be simplified into something like below:
val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq
val length = inputSeq.size
inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
})(expr1)
The inconsistent behavior between applying the original groupByKey/flatMapGroups
methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame
versus a Dataset[Row]
.
Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436
This issue is caused by how you create the row:
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.
Changing the line to create GenericRowWithSchema can solve it:
listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)