MinMax Normalization in scala
I guess what you want is something like this
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{min, max, lit}
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match {
case Row(x: Double, y: Double) => (x, y)
}
val scaledRange = lit(2) // Range of the scaled variable
val scaledMin = lit(-1) // Min value of the scaled variable
val vNormalized = ($"v" - vMin) / (vMax - vMin) // v normalized to (0, 1) range
val vScaled = scaledRange * vNormalized + scaledMin
df.withColumn("vScaled", vScaled).show
// +---+-----+--------------------+
// | k| v| vScaled|
// +---+-----+--------------------+
// | 1| 0.5| -0.3093093093093092|
// | 2| 10.2| 0.27327327327327344|
// | 3| 5.7|0.003003003003003...|
// | 4|-11.0| -1.0|
// | 5| 22.3| 1.0|
// +---+-----+--------------------+
there is another solution. Take codes from Matt, Lyle and zero323, thanks!
import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler}
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
val assembler = new VectorAssembler().setInputCols(Array("v")).setOutputCol("vVec")
val df2= assembler.transform(df)
val scaler = new MinMaxScaler().setInputCol("vVec").setOutputCol("vScaled").setMax(1).setMin(-1)
scaler.fit(df2).transform(df2).show
result:
+---+-----+-------+--------------------+
| k| v| vVec| vScaled|
+---+-----+-------+--------------------+
| 1| 0.5| [0.5]|[-0.3093093093093...|
| 2| 10.2| [10.2]|[0.27327327327327...|
| 3| 5.7| [5.7]|[0.00300300300300...|
| 4|-11.0|[-11.0]| [-1.0]|
| 5| 22.3| [22.3]| [1.0]|
+---+-----+-------+--------------------+
btw: the other solutions produce error on my side
java.lang.IllegalArgumentException: requirement failed: Column vVec must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:43)
at org.apache.spark.ml.feature.MinMaxScalerParams$class.validateAndTransformSchema(MinMaxScaler.scala:67)
at org.apache.spark.ml.feature.MinMaxScaler.validateAndTransformSchema(MinMaxScaler.scala:93)
at org.apache.spark.ml.feature.MinMaxScaler.transformSchema(MinMaxScaler.scala:129)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.feature.MinMaxScaler.fit(MinMaxScaler.scala:119)
... 50 elided
THANKS A LOT whatever!
Here's another suggestion when you are already playing with Spark.
Why don't you use MinMaxScaler in ml package?
Let's try this with the same example from zero323.
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.functions.udf
val df = sc.parallelize(Seq(
(1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3)
)).toDF("k", "v")
//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v"))))
val vectorizeCol = udf( (v:Double) => Vectors.dense(Array(v)) )
val df2 = df.withColumn("vVec", vectorizeCol(df("v"))
val scaler = new MinMaxScaler()
.setInputCol("vVec")
.setOutputCol("vScaled")
.setMax(1)
.setMin(-1)
scaler.fit(df2).transform(df2).show
+---+-----+-------+--------------------+
| k| v| vVec| vScaled|
+---+-----+-------+--------------------+
| 1| 0.5| [0.5]|[-0.3093093093093...|
| 2| 10.2| [10.2]|[0.27327327327327...|
| 3| 5.7| [5.7]|[0.00300300300300...|
| 4|-11.0|[-11.0]| [-1.0]|
| 5| 22.3| [22.3]| [1.0]|
+---+-----+-------+--------------------+
Take advantage of scaling multiple columns at once.
val df = sc.parallelize(Seq(
(1.0, -1.0, 2.0),
(2.0, 0.0, 0.0),
(0.0, 1.0, -1.0)
)).toDF("a", "b", "c")
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("a", "b", "c"))
.setOutputCol("features")
val df2 = assembler.transform(df)
// Reusing the scaler instance above with the same min(-1) and max(1)
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show
+---+----+----+--------------+--------------------+
| a| b| c| features| scaledFeatures|
+---+----+----+--------------+--------------------+
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]| [0.0,-1.0,1.0]|
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...|
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]| [-1.0,1.0,-1.0]|
+---+----+----+--------------+--------------------+