pyspark name accumulator code example

Example: pyspark name accumulator

# Named accumulators are not available in Python yet
# You can use broadcast variables to see you progress
# Somehow

# Get a spark context sc and read your data into and rdd (or DataFrame)
rows = sc.textFile("/path/to/data/sample_data.txt")
# broadcast an object defined like below
processor = spark.sparkContext.broadcast(Processor())
# Apply this lambda to your rdd
mapped = rows.map(lambda x: processor.value.process(x))
# Other transformations you need to do
sc.stop()

class Processor:
    def process(self, content):
        # either print some output to the screen
        print(content)
        
        # or write to a file
        with open('results.txt', 'a') as f:
            f.write(content + "\n")

        # or call an API to store and count the requests
        requests.get('https://some.external.api/' + content)

        return self.other_processing(content)

Tags:

Misc Example