How to stop Python Kafka Consumer in program?
We can first check the offset of the last message in the topic. Then stop the loop when we have reached that offset.
client = "localhost:9092"
consumer = KafkaConsumer(client)
topic = 'test'
tp = TopicPartition(topic,0)
#register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
Use the iter_timeout parameter to set the waiting time. If set to 10, just like the following piece of code, it will exit if no new message come in in 10 seconds. The default value is None, which means that the consumer will block here even if no new messages come in.
self.consumer = SimpleConsumer(self.client, "test-group", "test",
iter_timeout=10)
Update
The above is not a good method. When lots of messages come in, it is hard to set a small enough iter_timeout to guarantee the stopping. So, now, I am using get_message() function, which try to consume one message and stop. None is returned when no new messages.