RabbitMQ-- selectively retrieving messages from a queue

RabbitMQ is perfect for this situation. You have a number of options to do what you want. I suggest reading the documentation to get a better understanding. I would suggest that you use a topic or direct exchange. Topic is more flexible. It goes like this.

Producer code connects to the RabbitMQ Broker and creates and Exchange with a specific name.

Producer publishes to exchange. Each message published will be published with a routing key.

Consumer connects to RabbitMQ broker.

Consumer creates Queue

Consumer binds Queue to the exchange, the same exchange defined in the producer. The binding also includes the routing keys for each message require for this particular consumer.

Lets say you were publishing log messages. The routing key might be something like "log.info", "log.warn", "log.error". Each message published by the producer will have the relevant routing key attached. You will then have a consumer that sends and email for all the error messages and another one that writes all the error messages to a file. So the emailer will define the binding from its queue to the exchange with the routing key "log.error". This way though the exchange receives all messages, the queue defined for the emailer will only contain the error messages. The filelogger will define a new separate queue bound to the same exchange and set up a different routing key. You could do three separate bindings for the three different routing keys require or just use the wildcard "log.*" to request all messages from the exchange starting with log.

This is a simple example that shows how you can achieve what you want to do.

look here for code examples specifically number tutorial number 5.


Making the best of exchange/routing of rabbitmq is recommended. If you do want to check according to the message content, the following code is a viable solution.

Retrieve messages from a queue and check, selectively ack the messages in which you're interested.

pull one message

GetResponse resp = channel.basicGet(QUEUE_NAME, false);

ack one message

channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

Example

import com.rabbitmq.client.*;

public class ReceiveLogs {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();){

            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // pull one message and ack manually and exit
            GetResponse resp = channel.basicGet(QUEUE_NAME, false);
            if( resp != null ){
                String message = new String(resp.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
            }
            System.out.println();
        }
    }
}

dependency

compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'

Tags:

Rabbitmq