Does Kafka support priority for topic or message?

Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.So there is no priority on topic or message.

I also faced same problem that you have.Solution is very simple.Create topics in kafka queue,Let say:

  1. high_priority_queue

  2. medium_priority_queue

  3. low_priority_queue

Publish high priority message in high_priority_queue and medium priority message in medium_priority_queue.

Now you can create kafka consumer and open stream for all topic.

  // this is scala code 
  val props = new Properties()
  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  val config = new ConsumerConfig(props)
  val connector = Consumer.create(config)
  val topicWithStreamCount = Map(
       "high_priority_queue" -> 1,
       "medium_priority_queue" ->  1, 
       "low_priority_queue" -> 1
  )
  val streamsMap = connector.createMessageStreams(topicWithStreamCount)

You get stream of each topic.Now you can first read high_priority topic if topic does not have any message then fallback on medium_priority_queue topic. if medium_priority_queue is empty then read low_priority queue.

This trick is working fine for me.May be helpful for you!!.


There is a blog from Confluent on Implementing Message Prioritization in Apache Kafka which describes how you can implement a message priorization.

First, it is important to understand that the design of Kafka does not allow an out-of-the-box solution for prioritizing messages. The main reasons are:

  • Storage: Kafka is designed as an append-only commit log that contains immutable messages reflecting the timely occurence of real-life events.
  • Consumers: Messages in a Kafka topic can be consumed by multiple consumers at the same time. Each consumer might have different priorities which makes it impossible to sort the messages within the topic in advance.

The proposed solution is to use the Bucket Priority Pattern which is available on GitHub and can be best described with the diagrams in their README. Instead of using multiple topics for different priorites you can use a single topic with multiple partitions by customizing the partitioner of the producer and the assignment strategy of the consumer.

Based on a messages key, the producer will write the message into the correct priority bucket:

enter image description here

The consumer group on the other hand will customize its assignment strategy and prioritize reading messages from the partitions with the highest partitions:

enter image description here

In your clients code (producer and consumer) you will need to start and adjust the following client configurations.

# Producer
configs.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,
   BucketPriorityPartitioner.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");

# Consumer
configs.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
   BucketPriorityAssignor.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");


You can checkout priority-kafka-client for priority consumption from topics.

Basic idea is as follows (copy/pasting parts of the README):

In this context, priority is a positive integer (N) with priority levels 0 < 1 < ... < N-1

PriorityKafkaProducer (implements org.apache.kafka.clients.producer.Producer):

The implementation takes in an additional arg of priority level Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record). This is an indication to produce record on that priority level. Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) defaults the record production on the lowest priority level 0. For every logical topic XYZ - priority level 0 <= i < N is backed by Kafka topic XYZ-i

CapacityBurstPriorityKafkaConsumer (implements org.apache.kafka.clients.consumer.Consumer):

The implementation maintains a KafkaConsumer for every priority level 0 <= i < N. For every logical topic XYZ and logical group ID ABC - priority level 0 <= i < N consumer binds to Kafka topic XYZ-i with group ID ABC-i. This works in tandem with PriorityKafkaProducer.

max.poll.records property is split across priority topic consumers based on maxPollRecordsDistributor - defaulted to ExpMaxPollRecordsDistributor. Rest of the KafkaConsumer configs are passed as is to each of the priority topic consumers. Care has to be taken when defining max.partition.fetch.bytes, fetch.max.bytes and max.poll.interval.ms as these values will be used as is across all the priority topic consumers.

Works on the idea of distributing max.poll.records property across each of the priority topic consumers as their reserved capacity. Records are fetched sequentially from all priority level topics consumers which are configured with distributed max.poll.records values. The distribution must reserve higher capacity or processing rate to higher priorities.

Caution 1 - if we have skewed partitions in priority level topics e.g. 10K records in a priority 2 partition, 100 records in a priority 1 partition, 10 records in a priority 0 partition that are assigned to different consumer threads, then the implementation will not synchronize across such consumers to regulate capacity and hence will fail to honour priority. So the producers must ensure there are no skewed partitions (e.g. using round-robin - this "may" imply there is no message ordering assumptions and the consumer may choose to process records in parallel by separating out fetching and processing concerns).

Caution 2 - If we have empty partitions in priority level topics e.g. no pending records in assigned priority 2 and 1 partitions, 10K records in priority 0 partition that are assigned to the same consumer thread, then we want priority 0 topic partition consumer to burst its capacity to max.poll.records and not restrict itself to its reserved capacity based on maxPollRecordsDistributor else the overall capacity will be under utilized.

This implementation will try to address cautions explained above. Every consumer object will have individual priority level topic consumers, with each priority level consumer having reserved capacity based on maxPollRecordsDistributor. Each of the priority level topic consumer will try to burst into other priority level topic consumer's capacity in the group provided all the below are true:

  • It is eligible to burst - This is if in the last max.poll.history.window.size attempts of poll() atleast min.poll.window.maxout.threshold times it has received number of records equal to assigned max.poll.records which was distributed based on maxPollRecordsDistributor. This is an indication that the partition has more incoming records to be processed.

  • Higher priority level is not eligible to burst - There is no higher priority level topic consumer that is eligible to burst based on above logic. Basically give way to higher priorities.

If the above are true, then the priority level topic consumer will burst into all other priority level topic consumer capacities. The amount of burst per priority level topic consumer is equal to the least un-used capacity in the last max.poll.history.window.size attempts of poll().

Tags:

Apache Kafka