How to create hive table from Spark data frame, using its schema?
Assuming, you are using Spark 2.1.0 or later and my_DF is your dataframe,
//get the schema split as string with comma-separated field-datatype pairs
StructType my_schema = my_DF.schema();
String columns = Arrays.stream(my_schema.fields())
.map(field -> field.name()+" "+field.dataType().typeName())
.collect(Collectors.joining(","));
//drop the table if already created
spark.sql("drop table if exists my_table");
//create the table using the dataframe schema
spark.sql("create table my_table(" + columns + ")
row format delimited fields terminated by '|' location '/my/hdfs/location'");
//write the dataframe data to the hdfs location for the created Hive table
my_DF.write()
.format("com.databricks.spark.csv")
.option("delimiter","|")
.mode("overwrite")
.save("/my/hdfs/location");
The other method using temp table
my_DF.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists my_table");
spark.sql("create table my_table as select * from my_temp_table");
As per your question it looks like you want to create table in hive using your data-frame's schema. But as you are saying you have many columns in that data-frame so there are two options
- 1st is create direct hive table trough data-frame.
- 2nd is take schema of this data-frame and create table in hive.
Consider this code:
package hive.example
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object checkDFSchema extends App {
val cc = new SparkConf;
val sc = new SparkContext(cc)
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
//First option for creating hive table through dataframe
val DF = sparkSession.sql("select * from salary")
DF.createOrReplaceTempView("tempTable")
sparkSession.sql("Create table yourtable as select * form tempTable")
//Second option for creating hive table from schema
val oldDFF = sparkSession.sql("select * from salary")
//Generate the schema out of dataframe
val schema = oldDFF.schema
//Generate RDD of you data
val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
//Creating new DF from data and schema
val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
newDFwithSchema.createOrReplaceTempView("tempTable")
sparkSession.sql("create table FinalTable AS select * from tempTable")
}