Spark, Scala, DataFrame: create feature vectors
A little bit more DataFrame
centric solution:
import org.apache.spark.ml.feature.VectorAssembler
val df = sc.parallelize(Seq(
(1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
(2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
(3, "cat8", 2))).toDF("userID", "category", "frequency")
// Create a sorted array of categories
val categories = df
.select($"category")
.distinct.map(_.getString(0))
.collect
.sorted
// Prepare vector assemble
val assembler = new VectorAssembler()
.setInputCols(categories)
.setOutputCol("features")
// Aggregation expressions
val exprs = categories.map(
c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))
val transformed = assembler.transform(
df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
.select($"userID", $"features")
and an UDAF alternative:
import org.apache.spark.sql.expressions.{
MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types.{
StructType, ArrayType, DoubleType, IntegerType}
import scala.collection.mutable.WrappedArray
class VectorAggregate (n: Int) extends UserDefinedAggregateFunction {
def inputSchema = new StructType()
.add("i", IntegerType)
.add("v", DoubleType)
def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
def dataType = new VectorUDT()
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, Array.fill(n)(0.0))
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0)) {
val i = input.getInt(0)
val v = input.getDouble(1)
val buff = buffer.getAs[WrappedArray[Double]](0)
buff(i) += v
buffer.update(0, buff)
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
val buff1 = buffer1.getAs[WrappedArray[Double]](0)
val buff2 = buffer2.getAs[WrappedArray[Double]](0)
for ((x, i) <- buff2.zipWithIndex) {
buff1(i) += x
}
buffer1.update(0, buff1)
}
def evaluate(buffer: Row) = Vectors.dense(
buffer.getAs[Seq[Double]](0).toArray)
}
with example usage:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("category_idx")
.fit(df)
val indexed = indexer.transform(df)
.withColumn("category_idx", $"category_idx".cast("integer"))
.withColumn("frequency", $"frequency".cast("double"))
val n = indexer.labels.size + 1
val transformed = indexed
.groupBy($"userID")
.agg(new VectorAggregate(n)($"category_idx", $"frequency").as("vec"))
transformed.show
// +------+--------------------+
// |userID| vec|
// +------+--------------------+
// | 1|[1.0,5.0,0.0,3.0,...|
// | 2|[0.0,2.0,0.0,0.0,...|
// | 3|[5.0,0.0,16.0,0.0...|
// +------+--------------------+
In this case order of values is defined by indexer.labels
:
indexer.labels
// Array[String] = Array(cat1, cat9, cat7, cat2, cat8, cat4, cat10)
In practice I would prefer solution by Odomontois so these are provided mostly for reference.
Given your input:
val df = Seq((1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5),
(2, "cat4", 6), (2, "cat9", 2), (2, "cat10", 1),
(3, "cat1", 5), (3, "cat7", 16), (3, "cat8", 2))
.toDF("userID", "category", "frequency")
df.show
+------+--------+---------+
|userID|category|frequency|
+------+--------+---------+
| 1| cat1| 1|
| 1| cat2| 3|
| 1| cat9| 5|
| 2| cat4| 6|
| 2| cat9| 2|
| 2| cat10| 1|
| 3| cat1| 5|
| 3| cat7| 16|
| 3| cat8| 2|
+------+--------+---------+
Just run:
val pivoted = df.groupBy("userID").pivot("category").avg("frequency")
val dfZeros = pivoted.na.fill(0)
dzZeros.show
+------+----+-----+----+----+----+----+----+
|userID|cat1|cat10|cat2|cat4|cat7|cat8|cat9|
+------+----+-----+----+----+----+----+----+
| 1| 1.0| 0.0| 3.0| 0.0| 0.0| 0.0| 5.0|
| 3| 5.0| 0.0| 0.0| 0.0|16.0| 2.0| 0.0|
| 2| 0.0| 1.0| 0.0| 6.0| 0.0| 0.0| 2.0|
+------+----+-----+----+----+----+----+----+
Finally, use VectorAssembler to create a org.apache.spark.ml.linalg.Vector
NOTE: I have not checked performances on this yet...
EDIT: Possibly more complex, but likely more efficient!
def toSparseVectorUdf(size: Int) = udf[Vector, Seq[Row]] {
(data: Seq[Row]) => {
val indices = data.map(_.getDouble(0).toInt).toArray
val values = data.map(_.getInt(1).toDouble).toArray
Vectors.sparse(size, indices, values)
}
}
val indexer = new StringIndexer().setInputCol("category").setOutputCol("idx")
val indexerModel = indexer.fit(df)
val totalCategories = indexerModel.labels.size
val dataWithIndices = indexerModel.transform(df)
val data = dataWithIndices.groupBy("userId").agg(sort_array(collect_list(struct($"idx", $"frequency".as("val")))).as("data"))
val dataWithFeatures = data.withColumn("features", toSparseVectorUdf(totalCategories)($"data")).drop("data")
dataWithFeatures.show(false)
+------+--------------------------+
|userId|features |
+------+--------------------------+
|1 |(7,[0,1,3],[1.0,5.0,3.0]) |
|3 |(7,[0,2,4],[5.0,16.0,2.0])|
|2 |(7,[1,5,6],[2.0,6.0,1.0]) |
+------+--------------------------+
NOTE: StringIndexer will sort categories by frequency => most frequent category will be at index=0 in indexerModel.labels
. Feel free to use your own mapping if you'd like and pass that directly to toSparseVectorUdf
.
Suppose:
val cs: SparkContext
val sc: SQLContext
val cats: DataFrame
Where userId
and frequency
are bigint
columns which corresponds to scala.Long
We are creating intermediate mapping RDD
:
val catMaps = cats.rdd
.groupBy(_.getAs[Long]("userId"))
.map { case (id, rows) => id -> rows
.map { row => row.getAs[String]("category") -> row.getAs[Long]("frequency") }
.toMap
}
Then collecting all presented categories in the lexicographic order
val catNames = cs.broadcast(catMaps.map(_._2.keySet).reduce(_ union _).toArray.sorted)
Or creating it manually
val catNames = cs.broadcast(1 to 10 map {n => s"cat$n"} toArray)
Finally we're transforming maps to arrays with 0-values for non-existing values
import sc.implicits._
val catArrays = catMaps
.map { case (id, catMap) => id -> catNames.value.map(catMap.getOrElse(_, 0L)) }
.toDF("userId", "feature")
now catArrays.show()
prints something like
+------+--------------------+
|userId| feature|
+------+--------------------+
| 2|[0, 1, 0, 6, 0, 0...|
| 1|[1, 0, 3, 0, 0, 0...|
| 3|[5, 0, 0, 0, 16, ...|
+------+--------------------+
This could be not the most elegant solution for dataframes, as I barely familiar with this area of spark.
Note, that you could create your catNames
manually to add zeros for missing cat3
, cat5
, ...
Also note that otherwise catMaps
RDD is operated twice, you might want to .persist()
it