Spark, Scala, DataFrame: create feature vectors

A little bit more DataFrame centric solution:

import org.apache.spark.ml.feature.VectorAssembler

val df = sc.parallelize(Seq(
  (1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
  (2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
  (3, "cat8", 2))).toDF("userID", "category", "frequency")

// Create a sorted array of categories
val categories = df
  .select($"category")
  .distinct.map(_.getString(0))
  .collect
  .sorted

// Prepare vector assemble
val assembler =  new VectorAssembler()
  .setInputCols(categories)
  .setOutputCol("features")

// Aggregation expressions
val exprs = categories.map(
   c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))

val transformed = assembler.transform(
    df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
  .select($"userID", $"features")

and an UDAF alternative:

import org.apache.spark.sql.expressions.{
  MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types.{
  StructType, ArrayType, DoubleType, IntegerType}
import scala.collection.mutable.WrappedArray

class VectorAggregate (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType()
      .add("i", IntegerType)
      .add("v", DoubleType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = new VectorUDT()
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val i = input.getInt(0)
        val v = input.getDouble(1)
        val buff = buffer.getAs[WrappedArray[Double]](0) 
        buff(i) += v
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
      val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
}

with example usage:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("category_idx")
  .fit(df)

val indexed = indexer.transform(df)
  .withColumn("category_idx", $"category_idx".cast("integer"))
  .withColumn("frequency", $"frequency".cast("double"))

val n = indexer.labels.size + 1

val transformed = indexed
  .groupBy($"userID")
  .agg(new VectorAggregate(n)($"category_idx", $"frequency").as("vec"))

transformed.show

// +------+--------------------+
// |userID|                 vec|
// +------+--------------------+
// |     1|[1.0,5.0,0.0,3.0,...|
// |     2|[0.0,2.0,0.0,0.0,...|
// |     3|[5.0,0.0,16.0,0.0...|
// +------+--------------------+

In this case order of values is defined by indexer.labels:

indexer.labels
// Array[String] = Array(cat1, cat9, cat7, cat2, cat8, cat4, cat10)

In practice I would prefer solution by Odomontois so these are provided mostly for reference.


Given your input:

val df = Seq((1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), 
             (2, "cat4", 6), (2, "cat9", 2), (2, "cat10", 1), 
             (3, "cat1", 5), (3, "cat7", 16), (3, "cat8", 2))
           .toDF("userID", "category", "frequency")
df.show
+------+--------+---------+
|userID|category|frequency|
+------+--------+---------+
|     1|    cat1|        1|
|     1|    cat2|        3|
|     1|    cat9|        5|
|     2|    cat4|        6|
|     2|    cat9|        2|
|     2|   cat10|        1|
|     3|    cat1|        5|
|     3|    cat7|       16|
|     3|    cat8|        2|
+------+--------+---------+

Just run:

val pivoted = df.groupBy("userID").pivot("category").avg("frequency")
val dfZeros = pivoted.na.fill(0)
dzZeros.show    
+------+----+-----+----+----+----+----+----+                                    
|userID|cat1|cat10|cat2|cat4|cat7|cat8|cat9|
+------+----+-----+----+----+----+----+----+
|     1| 1.0|  0.0| 3.0| 0.0| 0.0| 0.0| 5.0|
|     3| 5.0|  0.0| 0.0| 0.0|16.0| 2.0| 0.0|
|     2| 0.0|  1.0| 0.0| 6.0| 0.0| 0.0| 2.0|
+------+----+-----+----+----+----+----+----+

Finally, use VectorAssembler to create a org.apache.spark.ml.linalg.Vector

NOTE: I have not checked performances on this yet...

EDIT: Possibly more complex, but likely more efficient!

def toSparseVectorUdf(size: Int) = udf[Vector, Seq[Row]] {
  (data: Seq[Row]) => {
    val indices = data.map(_.getDouble(0).toInt).toArray
    val values = data.map(_.getInt(1).toDouble).toArray
    Vectors.sparse(size, indices, values)
  }
}

val indexer = new StringIndexer().setInputCol("category").setOutputCol("idx")
val indexerModel = indexer.fit(df)
val totalCategories = indexerModel.labels.size
val dataWithIndices = indexerModel.transform(df)
val data = dataWithIndices.groupBy("userId").agg(sort_array(collect_list(struct($"idx", $"frequency".as("val")))).as("data"))
val dataWithFeatures = data.withColumn("features", toSparseVectorUdf(totalCategories)($"data")).drop("data")
dataWithFeatures.show(false)
+------+--------------------------+
|userId|features                  |
+------+--------------------------+
|1     |(7,[0,1,3],[1.0,5.0,3.0]) |
|3     |(7,[0,2,4],[5.0,16.0,2.0])|
|2     |(7,[1,5,6],[2.0,6.0,1.0]) |
+------+--------------------------+

NOTE: StringIndexer will sort categories by frequency => most frequent category will be at index=0 in indexerModel.labels. Feel free to use your own mapping if you'd like and pass that directly to toSparseVectorUdf.


Suppose:

val cs: SparkContext
val sc: SQLContext
val cats: DataFrame

Where userId and frequency are bigint columns which corresponds to scala.Long

We are creating intermediate mapping RDD:

val catMaps = cats.rdd
  .groupBy(_.getAs[Long]("userId"))
  .map { case (id, rows) => id -> rows
    .map { row => row.getAs[String]("category") -> row.getAs[Long]("frequency") }
    .toMap
  }

Then collecting all presented categories in the lexicographic order

val catNames = cs.broadcast(catMaps.map(_._2.keySet).reduce(_ union _).toArray.sorted)

Or creating it manually

val catNames = cs.broadcast(1 to 10 map {n => s"cat$n"} toArray)

Finally we're transforming maps to arrays with 0-values for non-existing values

import sc.implicits._
val catArrays = catMaps
      .map { case (id, catMap) => id -> catNames.value.map(catMap.getOrElse(_, 0L)) }
      .toDF("userId", "feature")

now catArrays.show() prints something like

+------+--------------------+
|userId|             feature|
+------+--------------------+
|     2|[0, 1, 0, 6, 0, 0...|
|     1|[1, 0, 3, 0, 0, 0...|
|     3|[5, 0, 0, 0, 16, ...|
+------+--------------------+

This could be not the most elegant solution for dataframes, as I barely familiar with this area of spark.

Note, that you could create your catNames manually to add zeros for missing cat3, cat5, ...

Also note that otherwise catMaps RDD is operated twice, you might want to .persist() it