Transpose DataFrame Without Aggregation in Spark with scala
You can do this using pivot
, but you still need aggregation but what if you have multiple value
for a COLUMN_NAME
?
val df = Seq(
("col1", "val1"),
("col2", "val2"),
("col3", "val3"),
("col4", "val4"),
("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")
df
.groupBy()
.pivot("COLUMN_NAME").agg(first("VALUE"))
.show()
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
EDIT:
if your dataframe is really that small as in your example, you can collect it as Map
:
val map = df.as[(String,String)].collect().toMap
and then apply this answer
If your dataframe is small enough as in the question, then you can collect COLUMN_NAME to form schema and collect VALUE to form the rows and then create a new dataframe as
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row]
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)
which should give you
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
Another solution though lengthy using crosstab.
val dfp = spark.sql(""" with t1 (
select 'col1' c1, 'val1' c2 union all
select 'col2' c1, 'val2' c2 union all
select 'col3' c1, 'val3' c2 union all
select 'col4' c1, 'val4' c2 union all
select 'col5' c1, 'val5' c2
) select c1 COLUMN_NAME, c2 VALUE from t1
""")
dfp.show(50,false)
+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-----------+-----+
val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |1 |0 |0 |0 |0 |
|val3 |0 |0 |1 |0 |0 |
|val2 |0 |1 |0 |0 |0 |
|val5 |0 |0 |0 |0 |1 |
|val4 |0 |0 |0 |1 |0 |
+-----------------+----+----+----+----+----+
val needed_cols = dfp2.columns.drop(1)
needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)
val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |val1|0 |0 |0 |0 |
|val3 |0 |0 |val3|0 |0 |
|val2 |0 |val2|0 |0 |0 |
|val5 |0 |0 |0 |0 |val5|
|val4 |0 |0 |0 |val4|0 |
+-----------------+----+----+----+----+----+
dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+