Spark - Random Number Generation
According to this post, the best solution is not to put the new scala.util.Random
inside the map, nor completely outside (ie. in the driver code), but in an intermediate mapPartitionsWithIndex
:
import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
val rand = new scala.util.Random(indx+myAppSeed)
iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}
Using Spark Dataset API, perhaps for use in an accumulator:
df.withColumn("_n", substring(rand(),3,4).cast("bigint"))
Just use the SQL function rand
:
import org.apache.spark.sql.functions._
//df: org.apache.spark.sql.DataFrame = [key: int]
df.select($"key", rand() as "rand").show
+---+-------------------+
|key| rand|
+---+-------------------+
| 1| 0.8635073400704648|
| 2| 0.6870153659986652|
| 3|0.18998048357873532|
+---+-------------------+
df.select($"key", rand() as "rand").show
+---+------------------+
|key| rand|
+---+------------------+
| 1|0.3422484248879837|
| 2|0.2301384925817671|
| 3|0.6959421970071372|
+---+------------------+
The reason why the same sequence is repeated is that the random generator is created and initialized with a seed before the data is partitioned. Each partition then starts from the same random seed. Maybe not the most efficient way to do it, but the following should work:
val myClass = new MyClass()
val M = 3
for (m <- 1 to M) {
val newDF = sqlContext.createDataFrame(myDF
.map{
val rand = scala.util.Random
row => RowFactory
.create(row.getString(0),
myClass.myMethod(row.getString(2), rand.nextDouble())
}, myDF.schema)
}