How to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method in Pyspark?
You want to do two things here: 1. flatten your data 2. put it into a dataframe
One way to do it is as follows:
First, let us flatten the dictionary:
rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])
When collecting the data, you get something like this:
[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...
Then we can format the data and turn it into a dataframe:
rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
.toDF(("CId", "IID", "Score"))\
.show()
which gives you this:
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+
There is an even easier and more elegant solution avoiding python lambda-expressions as in @oli answer which relies on spark DataFrames's explode
which perfectly fits your requirement. It should be faster too because there is no need to use python lambda's twice. See below:
from pyspark.sql.functions import explode
# dummy data
data = [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
# create your rdd
rdd = sc.parallelize(data)
# convert to spark data frame
df = rdd.toDF(["CId", "Values"])
# use explode
df.select("CId", explode("Values").alias("IID", "Score")).show()
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+