MinMax Normalization in scala

I guess what you want is something like this

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
  case Row(x: Double, y: Double) => (x, y)
}

val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1)  // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range

val vScaled = scaledRange * vNormalized + scaledMin

df.withColumn("vScaled", vScaled).show

// +---+-----+--------------------+
// |  k|    v|             vScaled|
// +---+-----+--------------------+
// |  1|  0.5| -0.3093093093093092|
// |  2| 10.2| 0.27327327327327344|
// |  3|  5.7|0.003003003003003...|
// |  4|-11.0|                -1.0|
// |  5| 22.3|                 1.0|
// +---+-----+--------------------+

there is another solution. Take codes from Matt, Lyle and zero323, thanks!

import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)

val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)

scaler.fit(df2).transform(df2).show

result:

+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

btw: the other solutions produce error on my side

java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
  at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
  at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
  at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
  ... 50 elided

THANKS A LOT whatever!


Here's another suggestion when you are already playing with Spark.

Why don't you use MinMaxScaler in ml package?

Let's try this with the same example from zero323.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))

val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))

val scaler = new MinMaxScaler()
    .setInputCol("vVec")
    .setOutputCol("vScaled")
    .setMax(1)
    .setMin(-1)

scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
|  k|    v|   vVec|             vScaled|
+---+-----+-------+--------------------+
|  1|  0.5|  [0.5]|[-0.3093093093093...|
|  2| 10.2| [10.2]|[0.27327327327327...|
|  3|  5.7|  [5.7]|[0.00300300300300...|
|  4|-11.0|[-11.0]|              [-1.0]|
|  5| 22.3| [22.3]|               [1.0]|
+---+-----+-------+--------------------+

Take advantage of scaling multiple columns at once.

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0),
    (2.0, 0.0, 0.0),
    (0.0, 1.0, -1.0)
)).toDF("a", "b", "c")

import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler()
    .setInputCols(Array("a", "b", "c"))
    .setOutputCol("features")

val df2 = assembler.transform(df)

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
|  a|   b|   c|      features|      scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|      [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|     [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+