Encoder error while trying to map dataframe row to updated row
There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:
- in 1.x
DataFrame.map
is((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- in 2.x
Dataset[Row].map
is((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
If you really want to use map
you should use statically typed Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
or at least return an object which will have implicit encoder:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Finally if for some completely crazy reason you really want to map over Dataset[Row]
you have to provide required encoder:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
Just to add a few other important-to-know points in order to well understand the other answers (especially the final point of @zero323's answer about map
over Dataset[Row]
):
- First of all,
Dataframe.map
gives you aDataset
(more specifically,Dataset[T]
, rather thanDataset[Row]
)! - And
Dataset[T]
always requires an encoder, that's what this sentence "Dataset[Row].map
is((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
" means. - There are indeed lots of encoders predefined already by Spark (which can be
import
ed by doingimport spark.implicits._
), but still the list would not be able to cover many domain specific types that developers may create, in which case you need to create encoders yourself. - In the specific example on this page,
df.map
returns aRow
type forDataset
, and hang on a minute,Row
type is not within the list of types that have encoders predefined by Spark, hence you are going to create one on your own. - And I admit that creating an encoder for
Row
type is a bit different than the approach described in the above link, and you have to useRowEncoder
which takesStructType
as param describing type of a row, like what @zero323 provides above:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))
// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)
For scenario where dataframe schema is known in advance answer given by @zero323 is the solution
but for scenario with dynamic schema / or passing multiple dataframe to a generic function: Following code has worked for us while migrating from 1.6.1 from 2.2.0
import org.apache.spark.sql.Row
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
this code executes on both the versions of spark.
disadvantage : optimization provided by spark on dataframe/datasets api wont be applied.