简介
说明
本文用示例介绍springboot整合rabbitmq时如何处理死信队列/延迟队列。
rabbitmq消息简介
rabbitmq的消息默认不会超时。
什么是死信队列?什么是延迟队列?
死信队列:
dlx,全称为dead-letter-exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是dlx,绑定dlx的队列就称之为死信队列。
以下几种情况会导致消息变成死信:
- 消息被拒绝(basic.reject/basic.nack),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
延迟队列:
延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
相关网址
详解rabbitmq中死信队列和延迟队列的使用详解
实例代码
路由配置
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class rabbitrouterconfig { public static final string exchange_topic_welcome = "exchange@topic.welcome"; public static final string exchange_fanout_unroute = "exchange@fanout.unroute"; public static final string exchange_topic_delay = "exchange@topic.delay"; public static final string routingkey_hellos = "hello.#"; public static final string routingkey_delay = "delay.#"; public static final string queue_hello = "queue@hello"; public static final string queue_hi = "queue@hi"; public static final string queue_unroute = "queue@unroute"; public static final string queue_delay = "queue@delay"; public static final integer ttl_queue_message = 5000; @autowired amqpadmin amqpadmin; @bean object initbindingtest() { amqpadmin.declareexchange(exchangebuilder.fanoutexchange(exchange_fanout_unroute).durable(true).autodelete().build()); amqpadmin.declareexchange(exchangebuilder.topicexchange(exchange_topic_delay).durable(true).autodelete().build()); amqpadmin.declareexchange(exchangebuilder.topicexchange(exchange_topic_welcome) .durable(true) .autodelete() .withargument("alternate-exchange", exchange_fanout_unroute) .build()); amqpadmin.declarequeue(queuebuilder.durable(queue_hi).build()); amqpadmin.declarequeue(queuebuilder.durable(queue_hello) .withargument("x-dead-letter-exchange", exchange_topic_delay) .withargument("x-dead-letter-routing-key", routingkey_delay) .withargument("x-message-ttl", ttl_queue_message) .build()); amqpadmin.declarequeue(queuebuilder.durable(queue_unroute).build()); amqpadmin.declarequeue(queuebuilder.durable(queue_delay).build()); amqpadmin.declarebinding(new binding(queue_hello, binding.destinationtype.queue, exchange_topic_welcome, routingkey_hellos, null)); amqpadmin.declarebinding(new binding(queue_unroute, binding.destinationtype.queue, exchange_fanout_unroute, "", null)); amqpadmin.declarebinding(new binding(queue_delay, binding.destinationtype.queue, exchange_topic_delay, routingkey_delay, null)); return new object(); } }
控制器
package com.example.controller; import com.example.config.rabbitrouterconfig; import com.example.mq.sender; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.postmapping; import org.springframework.web.bind.annotation.restcontroller; import java.time.localdatetime; @restcontroller public class hellocontroller { @autowired private sender sender; @postmapping("/hi") public void hi() { sender.send(rabbitrouterconfig.queue_hi, "hi1 message:" + localdatetime.now()); } @postmapping("/hello1") public void hello1() { sender.send("hello.a", "hello1 message:" + localdatetime.now()); } @postmapping("/hello2") public void hello2() { sender.send(rabbitrouterconfig.exchange_topic_welcome, "hello.b", "hello2 message:" + localdatetime.now()); } @postmapping("/ae") public void aetest() { sender.send(rabbitrouterconfig.exchange_topic_welcome, "nonono", "ae message:" + localdatetime.now()); } }
发送器
package com.example.mq; import org.springframework.amqp.core.amqptemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.util.date; @component public class sender { @autowired private amqptemplate rabbittemplate; public void send(string routingkey, string message) { this.rabbittemplate.convertandsend(routingkey, message); } public void send(string exchange, string routingkey, string message) { this.rabbittemplate.convertandsend(exchange, routingkey, message); } }
接收器
package com.example.mq; import com.example.config.rabbitrouterconfig; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.stereotype.component; @component public class receiver { @rabbitlistener(queues = rabbitrouterconfig.queue_hi) public void hi(string payload) { system.out.println ("receiver(hi) : " + payload); } // @rabbitlistener(queues = rabbitrouterconfig.queue_hello) // public void hello(string hello) throws interruptedexception { // system.out.println ("receiver(hello) : " + hello); // thread.sleep(5 * 1000); // system.out.println("(hello):sleep over"); // } // // @rabbitlistener(queues = rabbitrouterconfig.queue_unroute) // public void unroute(string hello) throws interruptedexception { // system.out.println ("receiver(unroute) : " + hello); // thread.sleep(5 * 1000); // system.out.println("(unroute):sleep over"); // } @rabbitlistener(queues = rabbitrouterconfig.queue_delay) public void delay(string hello) throws interruptedexception { system.out.println ("receiver(delay) : " + hello); thread.sleep(5 * 1000); system.out.println("(delay):sleep over"); } }
application.yml
server: # port: 9100 port: 9101 spring: application: # name: demo-rabbitmq-sender name: demo-rabbitmq-receiver rabbitmq: host: localhost port: 5672 username: admin password: 123456 # virtualhost: / publisher-confirms: true publisher-returns: true # listener: # simple: # acknowledge-mode: manual # direct: # acknowledge-mode: manual
实例测试
分别启动发送者和接收者。
访问:http://localhost:9100/hello2
五秒钟后输出:
receiver(delay) : hello2 message:2020-11-27t09:30:51.548
(delay):sleep over
以上就是springboot整合rabbitmq处理死信队列和延迟队列的详细内容,更多关于springboot rabbitmq死信队列 延迟队列的资料请关注其它相关文章!