Spark: RDD to List
If you really want to create two Lists - meaning, you want all the distributed data to be collected into the driver application (risking slowness or OutOfMemoryError
) - you can use collect
and then use simple map
operations on the result:
val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)
Alternatively - if you want to "split" your RDD into two RDDs - it's pretty similar without collecting the data:
rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)
A third alternative is to first map into these two RDDs and then collect each one of them, but it's not much different from the first option and suffers from the same risks and limitations.
As an alternative to Tzach Zohar's answer, you can use unzip
on the lists:
scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)
Or keys
and values
on the RDD
s:
scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33
scala> rdd1.foreach{println}
a
c
scala> rdd2.foreach{println}
d
b