How to check the schema of DataFrame?
If I understand your requirement correctly, the following example illustrates how to revert a DataFrame with changed column types to its original version:
import org.apache.spark.sql.types._
val df1 = Seq(
(1, "a", 100L, 10.0), (2, "b", 200L, 20.0)
).toDF("c1", "c2", "c3", "c4")
val df2 = Seq(
(1, "a", 100, 10.0f), (2, "b", 200, 20.0f)
).toDF("c1", "c2", "c3", "c4")
df2.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: integer (nullable = false)
// |-- c4: float (nullable = false)
val fieldsDiffType = (df1.schema.fields zip df2.schema.fields).collect{
case (a: StructField, b: StructField) if a.dataType != b.dataType =>
(a.name, a.dataType)
}
// fieldsDiffType: Array[(String, org.apache.spark.sql.types.DataType)] =
// Array((c3,LongType), (c4,DoubleType))
val df2To1 = fieldsDiffType.foldLeft(df2)( (accDF, field) =>
accDF.withColumn(field._1, col(field._1).cast(field._2))
)
df2To1.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: long (nullable = false)
// |-- c4: double (nullable = false)
Note that this solution works only if the DataFrame columns remain the same in size and order, and does not cover types such as Array or Struct.
[UPDATE]
If there is concern that column order might be changed, you can first order df1.schema.fields
and df2.schema.fields
before performing zip
:
df1.schema.fields.sortBy(_.name) zip df2.schema.fields.sortBy(_.name)
You can try
> df.printSchema
root
|-- id: string (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
This prints the schema in a tree format.Hope this helps.
You can get the schema of a dataframe with the schema method
df.schema // Or `df.printSchema` if you want to print it nicely on the standard output
Define a castColumn method
def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame =
df.withColumn(colName, df.col(colName).cast(randomDataType))
Then apply this method to all the columns you need to cast.
First, get an Array of tuples with the colName and the targeted dataType
//Assume your dataframes have the same column names, you need to sortBy in case the it is not in the same order
// You can also iterate through dfOrigin.schema only and compare their dataTypes with target dataTypes instead of zipping
val differences = (dfOrigin.schema.fields.sortBy{case (x: StructField) => x.name} zip dfTarget.schema.fields.sortBy{case (x: StructField) => x.name}).collect {
case (origin: StructField, target: StructField) if origin.dataType != target.dataType =>
(origin.name, target.dataType)
}
Then
differences.foldLeft(df) {
case (acc, value) => castColumn(acc, value._1, value._2)
}
Based on Untyped Dataset Operations from https://spark.apache.org/docs/2.2.0/sql-programming-guide.html, it should be:
df.printSchema()