SpringBoot整合RabbitMQ处理死信队列和延迟队列

2022-07-14,,,,

简介

说明

本文用示例介绍springboot整合rabbitmq时如何处理死信队列/延迟队列。

rabbitmq消息简介

rabbitmq的消息默认不会超时。 

什么是死信队列?什么是延迟队列?

死信队列:

dlx,全称为dead-letter-exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是dlx,绑定dlx的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

  • 消息被拒绝(basic.reject/basic.nack),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解rabbitmq中死信队列和延迟队列的使用详解

实例代码

路由配置

package com.example.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
@configuration
public class rabbitrouterconfig {
    public static final string exchange_topic_welcome   = "exchange@topic.welcome";
    public static final string exchange_fanout_unroute  = "exchange@fanout.unroute";
    public static final string exchange_topic_delay     = "exchange@topic.delay";
 
    public static final string routingkey_hellos        = "hello.#";
    public static final string routingkey_delay         = "delay.#";
 
    public static final string queue_hello              = "queue@hello";
    public static final string queue_hi                 = "queue@hi";
    public static final string queue_unroute            = "queue@unroute";
    public static final string queue_delay              = "queue@delay";
 
    public static final integer ttl_queue_message       = 5000;
 
    @autowired
    amqpadmin amqpadmin;
 
    @bean
    object initbindingtest() {
        amqpadmin.declareexchange(exchangebuilder.fanoutexchange(exchange_fanout_unroute).durable(true).autodelete().build());
        amqpadmin.declareexchange(exchangebuilder.topicexchange(exchange_topic_delay).durable(true).autodelete().build());
        amqpadmin.declareexchange(exchangebuilder.topicexchange(exchange_topic_welcome)
                .durable(true)
                .autodelete()
                .withargument("alternate-exchange", exchange_fanout_unroute)
 
                .build());
 
        amqpadmin.declarequeue(queuebuilder.durable(queue_hi).build());
        amqpadmin.declarequeue(queuebuilder.durable(queue_hello)
                .withargument("x-dead-letter-exchange", exchange_topic_delay)
                .withargument("x-dead-letter-routing-key", routingkey_delay)
                .withargument("x-message-ttl", ttl_queue_message)
                .build());
        amqpadmin.declarequeue(queuebuilder.durable(queue_unroute).build());
        amqpadmin.declarequeue(queuebuilder.durable(queue_delay).build());
 
        amqpadmin.declarebinding(new binding(queue_hello, binding.destinationtype.queue,
                exchange_topic_welcome, routingkey_hellos, null));
        amqpadmin.declarebinding(new binding(queue_unroute, binding.destinationtype.queue,
                exchange_fanout_unroute, "", null));
        amqpadmin.declarebinding(new binding(queue_delay, binding.destinationtype.queue,
                exchange_topic_delay, routingkey_delay, null));
 
        return new object();
    }
}

控制器

package com.example.controller;
 
import com.example.config.rabbitrouterconfig;
import com.example.mq.sender;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.postmapping;
import org.springframework.web.bind.annotation.restcontroller;
 
import java.time.localdatetime;
 
@restcontroller
public class hellocontroller {
    @autowired
    private sender sender;
 
    @postmapping("/hi")
    public void hi() {
        sender.send(rabbitrouterconfig.queue_hi, "hi1 message:" + localdatetime.now());
    }
 
    @postmapping("/hello1")
    public void hello1() {
        sender.send("hello.a", "hello1 message:" + localdatetime.now());
    }
 
    @postmapping("/hello2")
    public void hello2() {
        sender.send(rabbitrouterconfig.exchange_topic_welcome, "hello.b", "hello2 message:" + localdatetime.now());
    }
 
    @postmapping("/ae")
    public void aetest() {
        sender.send(rabbitrouterconfig.exchange_topic_welcome, "nonono", "ae message:" + localdatetime.now());
    }
}

发送器

package com.example.mq;
 
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
 
import java.util.date;
 
@component
public class sender {
    @autowired
    private amqptemplate rabbittemplate;
 
    public void send(string routingkey, string message) {
        this.rabbittemplate.convertandsend(routingkey, message);
    }
 
    public void send(string exchange, string routingkey, string message) {
        this.rabbittemplate.convertandsend(exchange, routingkey, message);
    }
}

接收器

package com.example.mq;
 
import com.example.config.rabbitrouterconfig;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
@component
public class receiver {
    @rabbitlistener(queues = rabbitrouterconfig.queue_hi)
    public void hi(string payload) {
        system.out.println ("receiver(hi) : "  + payload);
    }
 
    // @rabbitlistener(queues = rabbitrouterconfig.queue_hello)
    // public void hello(string hello) throws interruptedexception {
    //     system.out.println ("receiver(hello) : "  + hello);
    //     thread.sleep(5 * 1000);
    //     system.out.println("(hello):sleep over");
    // }
    //
    // @rabbitlistener(queues = rabbitrouterconfig.queue_unroute)
    // public void unroute(string hello) throws interruptedexception {
    //     system.out.println ("receiver(unroute) : "  + hello);
    //     thread.sleep(5 * 1000);
    //     system.out.println("(unroute):sleep over");
    // }
 
    @rabbitlistener(queues = rabbitrouterconfig.queue_delay)
    public void delay(string hello) throws interruptedexception {
        system.out.println ("receiver(delay) : "  + hello);
        thread.sleep(5 * 1000);
        system.out.println("(delay):sleep over");
    }
}

application.yml

server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualhost: /
    publisher-confirms: true
    publisher-returns: true
#    listener:
#      simple:
#        acknowledge-mode: manual
#      direct:
#        acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

receiver(delay) : hello2 message:2020-11-27t09:30:51.548
(delay):sleep over

以上就是springboot整合rabbitmq处理死信队列和延迟队列的详细内容,更多关于springboot rabbitmq死信队列 延迟队列的资料请关注其它相关文章!

《SpringBoot整合RabbitMQ处理死信队列和延迟队列.doc》

下载本文的Word格式文档,以方便收藏与打印。