Spark, Scala - column type determine

It has a easy way to address this which is get(i: Int): Any. And it will be map between Spark SQL types and return types automatically. e.g.

val fieldIndex = row.fieldIndex("date")
val date = row.get(fieldIndex)

You can pattern-match on the type of the column (using the DataFrame's schema) to decide whether to parse the String into a Timestamp or just use the Timestamp as is - and use the unix_timestamp function to do the actual conversion:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType

// preparing some example data - df1 with String type and df2 with Timestamp type
val df1 = Seq(("a", "2016-02-01"), ("b", "2016-02-02")).toDF("key", "date")
val df2 = Seq(
  ("a", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-01").getTime)),
  ("b", new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse("2016-02-02").getTime))
).toDF("key", "date")

// If column is String, converts it to Timestamp
def normalizeDate(df: DataFrame): DataFrame = {
  df.schema("date").dataType match {
    case StringType => df.withColumn("date", unix_timestamp($"date", "yyyy-MM-dd").cast("timestamp"))
    case _ => df
  }
}

// after "normalizing", you can assume date has Timestamp type - 
// both would print the same thing:
normalizeDate(df1).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)
normalizeDate(df2).rdd.map(r => r.getAs[Timestamp]("date")).foreach(println)

Here are a few things you can try:

(1) Start utilizing the inferSchema function during load if you have a version that supports it. This will have spark figure the data type of columns, this doesn't work in all scenarios. Also look at the input data, if you have quotes I advise adding an extra argument to account for them during the load.

val inputDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load(fileLocation)

(2) To identify the data type of a column you can use the below code, it will place all of the column name and data types into their own Arrays of Strings.

val columnNames : Array[String] = inputDF.columns
val columnDataTypes : Array[String] = inputDF.schema.fields.map(x=>x.dataType).map(x=>x.toString)