How to use Spark Streaming to read a stream and find the IP over a time Window?
I have solved my problem using this method:
def getFrequentIps(stream, time_window, min_packets):
frequent_ips = (stream.flatMap(lambda line: format_stream(line))
# Count the occurrences of a specific pair
.countByValueAndWindow(time_window, time_window, 4)
# Filter above the threshold imposed by min_packets
.filter(lambda count: count[1] >= int(min_packets))
.transform(lambda record: record.sortBy(lambda x: x[1], ascending=False)))
number_items = 20
print("Every %s seconds the top-%s channles with more than %s packages will be showed: " %
(time_window, number_items, min_packets))
frequent_ips.pprint(number_items)