pyspark name accumulator code example
Example: pyspark name accumulator
# Named accumulators are not available in Python yet
# You can use broadcast variables to see you progress
# Somehow
# Get a spark context sc and read your data into and rdd (or DataFrame)
rows = sc.textFile("/path/to/data/sample_data.txt")
# broadcast an object defined like below
processor = spark.sparkContext.broadcast(Processor())
# Apply this lambda to your rdd
mapped = rows.map(lambda x: processor.value.process(x))
# Other transformations you need to do
sc.stop()
class Processor:
def process(self, content):
# either print some output to the screen
print(content)
# or write to a file
with open('results.txt', 'a') as f:
f.write(content + "\n")
# or call an API to store and count the requests
requests.get('https://some.external.api/' + content)
return self.other_processing(content)