阿里云 rocketMq 延时消息

2023-03-10,,

初始化消费者和生产者

生产者 设置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。

首先 编辑一个配置类,将关于配置rocketmq的东西写在配置类中

`

@Component

@Getter

@Setter

@Slf4j

public class RocketMqConfig {

@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe; private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null; public String getTopic() {
return TOPIC;
} public String getTag() {
return TAG;
} public String getGroupId() {
return GROUP_ID;
} public Properties getProperties() {
log.info("accessKey:" + getAccessKey());
log.info("secretKey:" + getSecretKey());
log.info("naemServer:" + getNameServe());
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
return properties;
}

}

生产者初始化

`

import com.aliyun.openservices.ons.api.ONSFactory;

import com.aliyun.openservices.ons.api.Producer;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

@author

@Date 2021/11/3.

*/

@Component

@Slf4j

public class RocketMqProducerInit {

@Autowired

private RocketMqConfig mqConfig;

private static Producer producer;

@PostConstruct

public void init(){

log.info("启动RocketMq生产者!");

producer = ONSFactory.createProducer(mqConfig.getProperties());

// 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown

producer.start();

}

/**

初始化生产者
@return

*/

public Producer getProducer(){

return producer;

}

}

`

`

编写生产者

`

import cn.hutool.core.date.DateUtil;

import com.aliyun.openservices.ons.api.Message;

import com.aliyun.openservices.ons.api.SendResult;

import com.aliyun.openservices.ons.api.exception.ONSClientException;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import static cn.hutool.core.convert.Convert.longToBytes;

@Component

@Slf4j

public class ProducerSendMessageHandler {

@Autowired

private RocketMqConfig config;

@Autowired
private RocketMqProducerInit producer; public void sendGetLiveStatus(long periodId) { Message message = new Message(config.getTopic(), config.getTag(), longToBytes(periodId));
message.setStartDeliverTime(System.currentTimeMillis() + 100000);
try {
SendResult send = this.producer.getProducer().send(message);
log.info("发送生产消息时间:"+DateUtil.now() + "发送延时消息成功! Topic is:" + config.getTopic() + "msgId is: " +
send.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}

}

`

监听者

`

import cn.hutool.core.date.DateUtil;

import com.aliyun.openservices.ons.api.*;

import com.dapeng.cloud.service.live.application.period.PeriodManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import static cn.hutool.core.convert.Convert.bytesToLong;

@Component

@Slf4j

public class LiveMessageStreamListener {

@Autowired
private RocketMqConfig mqConfig; private static Consumer consumer;
@Autowired
private PeriodManager periodManager; @PostConstruct
public void init(){
log.info("消费者启动!");
consumer = ONSFactory.createConsumer(mqConfig.getProperties());
//监听第一个topic,new对应的监听器
consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) { log.info("消费者接收到MQ消息时间: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
try {
//调用场次接口
long periodId = bytesToLong(message.getBody());
log.info("获取到 场次 id =" +periodId);
log.info("开始执行调用异常回调处理");
// 调用自己的业务代码进行操作
log.info("执行调用异常回调处理结果:"+b);
//消费成功,继续消费下一条消息
return Action.CommitMessage;
} catch (Exception e) {
log.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
return Action.ReconsumeLater;
}
}
});
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return consumer
*/
public Consumer getConsumer(){
return consumer;
}

}

`

阿里云 rocketMq 延时消息的相关教程结束。

《阿里云 rocketMq 延时消息.doc》

下载本文的Word格式文档,以方便收藏与打印。