spring AMQP async nack code example

Example: spring AMQP async nack

// setAcknowledgeMode to MANUAL
@Bean
fun jsaFactory(
    connectionFactory: ConnectionFactory,
    configurer: SimpleRabbitListenerContainerFactoryConfigurer
): SimpleRabbitListenerContainerFactory {
    val factory = SimpleRabbitListenerContainerFactory()

    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)
    factory.setMessageConverter(jsonMessageConverter())

    configurer.configure(factory, connectionFactory)

    return factory
}

// throw AmqpRejectAndDontRequeueException to nack
@RabbitListener(queues = ["salesforce.contact-courses.update"])
fun update(message: CourseRequestDto): Mono<Void> {
    message.email ?: return Mono.error<Void>(AmqpRejectAndDontRequeueException("Invalid message received"))

    return courseDtoBuilder.getInstance(message)
        .flatMap { contactRestClient.coursePatch(message.email, it) }
        .doOnError { throw AmqpRejectAndDontRequeueException(it.message) } // NACK
}

Tags:

Misc Example