PySpark reduceByKey? to add Key/Tuple
Instead of:
.map(lambda x: (x[0], [x[1], 1]))
We could do this:
.map(lambda x: ((x[0], x[1]), 1))
And in the last step, we could use reduceByKey and add. Note that add comes from the operator package.
Putting it together:
from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')])
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()
I'm much more familiar with Spark in Scala, so there may be better ways than Counter
to count the characters in the iterable produced by groupByKey
, but here's an option:
from collections import Counter
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')])
rdd.groupByKey().mapValues(Counter).collect()
[(48, Counter({'0': 2})),
(32, Counter({'6': 2})),
(49, Counter({'2': 2})),
(50, Counter({'0': 2})),
(51, Counter({'X': 1, 'T': 1})),
(53, Counter({'2': 1})),
(13, Counter({'A': 1, 'D': 1})),
(45, Counter({'A': 1, 'T': 1})),
(14, Counter({'T': 2})),
(54, Counter({'0': 1})),
(47, Counter({'2': 2}))]
If i understood you right, you can do it in one operation combineByKey:
from collections import Counter
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')])
result = x.combineByKey(lambda value: {value: 1},
... lambda x, value: value.get(x,0) + 1,
... lambda x, y: dict(Counter(x) + Counter(y)))
result.collect()
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})]