How to do opposite of explode in PySpark?
Thanks to @titipat for giving the RDD solution. I did realize shortly after my post that there is actually a DataFrame solution using collect_set
(or collect_list
):
from pyspark.sql import Row
from pyspark.sql.functions import collect_set
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
Row(user='Bob', word='world'),
Row(user='Mary', word='Have'),
Row(user='Mary', word='a'),
Row(user='Mary', word='nice'),
Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
group_user = df.groupBy('user').agg(collect_set('word').alias('words'))
print(group_user.collect())
>[Row(user='Mary', words=['Have', 'nice', 'day', 'a']), Row(user='Bob', words=['world', 'hello'])]
Here is a solution using rdd
.
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
Row(user='Bob', word='world'),
Row(user='Mary', word='Have'),
Row(user='Mary', word='a'),
Row(user='Mary', word='nice'),
Row(user='Mary', word='day')])
group_user = rdd.groupBy(lambda x: x.user)
group_agg = group_user.map(lambda x: Row(**{'user': x[0], 'word': [t.word for t in x[1]]}))
Output from group_agg.collect()
:
[Row(user='Bob', word=['hello', 'world']),
Row(user='Mary', word=['Have', 'a', 'nice', 'day'])]
from pyspark.sql import functions as F
df.groupby("user").agg(F.collect_list("word"))