rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列;
原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;
步骤:
1、创建带有时限的队列 dealLineQueue;
2、创建死信Faout交换机dealLineExchange;
3、创建消费队列realQueue,并和dealLineExchange绑定
4、配置dealLineQueue 的过期时间,消息过期后的死信交换机,重发的routing-key;
以下以springboot为例子贴出代码
项目结构:
基本值-DealConstant
package com.eyjian.rabbitmq.dealline; public interface DealConstant { String DEAL_LINE_QUEUE = "dealLineQueue"; String DEAL_LINE_EXCHANGE = "dealLineExchange"; String REAL_QUEUE= "realQueue"; }
消费者Lister
package com.eyjian.rabbitmq.dealline; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; /**
* 死信队里模拟延时队列
* @Author: yeyongjian
* @Date: 2019-05-18 14:12
*/
@Component
public class Lister {
@RabbitListener(queues = DealConstant.REAL_QUEUE)
public void handle(Message message){
byte[] body = message.getBody();
String msg = new String(body);
System.out.println(msg); }
}
配置类RabbitmqConfig
package com.eyjian.rabbitmq.dealline; import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import java.io.IOException;
import java.util.HashMap;
import java.util.Map; @Configuration
public class RabbitmqConfig {
@Autowired
private RabbitTemplate rabbitTemplate; //启动初始化删除绑定用的
// @PostConstruct
public void delete() throws IOException {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
channel.queueUnbind(DealConstant.REAL_QUEUE,DealConstant.DEAL_LINE_EXCHANGE,"");
}
@Bean
public Queue initDealLineQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DealConstant.DEAL_LINE_EXCHANGE);
args.put("x-dead-letter-routing-key", DealConstant.DEAL_LINE_QUEUE);//超时转发的队列
args.put("x-message-ttl", 5000);//延时时间
Queue queue = new Queue(DealConstant.DEAL_LINE_QUEUE,true,false,false,args);
return queue;
}
@Bean
FanoutExchange dealLineExchange() {
return new FanoutExchange(DealConstant.DEAL_LINE_EXCHANGE);
}
@Bean
Binding bindingiVewUgcTopicExchange(Queue initRealQueue, FanoutExchange dealLineExchange) {
return BindingBuilder.bind(initRealQueue).to(dealLineExchange);
}
@Bean
public Queue initRealQueue() {
return new Queue(DealConstant.REAL_QUEUE);
} }
application.properties文件
spring.rabbitmq.addresses=localhost
spring.rabbitmq.host=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
项目启动后,rabbitmq控制台信息如下:
test类发送消息
package com.eyjian.rabbitmq; import com.eyjian.rabbitmq.dealline.DealConstant;
import com.rabbitmq.client.Channel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; @RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqLearnApplicationTests { @Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws IOException {
rabbitTemplate.convertAndSend(DealConstant.DEAL_LINE_QUEUE,"hell word");
} }
5秒后控制台打印消息
源码地址:https://github.com/hd-eujian/rabbitmq-learn.git