SpringBoot整合RabbitMQ-消息可靠性投递

2023-07-29,,

本系列是学习SpringBoot整合RabbitMQ的练手,包含服务安装,RabbitMQ整合SpringBoot2.x,消息可靠性投递实现等三篇博客。

  学习路径:https://www.imooc.com/learn/1042 RabbitMQ消息中间件极速入门与实战

  项目源码:https://github.com/ZbLeaning/Boot-RabbitMQ


设计一个消息可靠性投递方案,服务结构如下:

组成:

  Sender+Confirm Listener :组成消息的生产者

  MQ Broker:消息的消费者,包含具体的MQ服务

  BIZ DB:业务数据数据库

  MSG DB:消息日志记录数据库(0:发送中、1:发送成功、2:发送失败)

思路:

  以最常见的创建订单业务来举例,假设订单创建成功后需要去发短信通知用户

  1、先完成订单业务数据的存储,并记录这条操作日志(发送中)

  2、生产者发送一条消息到消费者(异步)

  3、消费者成功消费后给给Confirm listener发送应答

  4、监听收到消息确认成功后,对消息日志表操作,修改之前的日志状态(发送成功)

  5、在消费端返回应答的过程中,可能发生网络异常导致生产者未收到应答消息,因此需要一个定时任务去捞取状态是发送中并已经超时的消息集合

  6、将捞取到的日志对应的消息,进行重发

  7、定时任务判断设置的消息最大重投次数,大于最大重投次数就判断消息发送失败,更新日志记录状态(发送失败)


项目搭建

  Durid数据源配置文件

//druid.properties
##下面为连接池的补充设置,应用到上面所有数据源中
#初始化大小,最小,最大
druid.initialSize=5
druid.minIdle=10
druid.maxActive=300
#配置获取连接等待超时的时间
druid.maxWait=60000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
druid.timeBetweenEvictionRunsMillis=60000
#配置一个连接在池中最小生存的时间,单位是毫秒
druid.minEvictableIdleTimeMillis=300000
druid.validationQuery=SELECT 1 FROM DUAL
druid.testWhileIdle=true
druid.testOnBorrow=false
druid.testOnReturn=false
#打开PSCache,并且指定每个连接上PSCache的大小
druid.poolPreparedStatements=true
druid.maxPoolPreparedStatementPerConnectionSize=20
#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
druid.filters=stat,wall,log4j
#通过connectProperties属性来打开mergeSql功能;慢SQL记录
druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多个DruidDataSource的监控数据
druid.useGlobalDataSourceStat=true

  添加相应的数据源配置类、定时任务配置类、常量类

package com.imooc.mq.config.database;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component; /**
* @Title: DruidDataSourceSettings
* @Description: Druid数据源读取配置
* @date 2019/1/2214:31
*/
@Component
@ConfigurationProperties(prefix = "spring.datasource")
@PropertySource("classpath:druid.properties")
public class DruidDataSourceSettings {
private String driverClassName;
private String url;
private String username;
private String password; @Value("${druid.initialSize}")
private int initialSize; @Value("${druid.minIdle}")
private int minIdle; @Value("${druid.maxActive}")
private int maxActive; @Value("${druid.timeBetweenEvictionRunsMillis}")
private long timeBetweenEvictionRunsMillis; @Value("${druid.minEvictableIdleTimeMillis}")
private long minEvictableIdleTimeMillis; @Value("${druid.validationQuery}")
private String validationQuery; @Value("${druid.testWhileIdle}")
private boolean testWhileIdle; @Value("${druid.testOnBorrow}")
private boolean testOnBorrow; @Value("${druid.testOnReturn}")
private boolean testOnReturn; @Value("${druid.poolPreparedStatements}")
private boolean poolPreparedStatements; @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
private int maxPoolPreparedStatementPerConnectionSize; @Value("${druid.filters}")
private String filters; @Value("${druid.connectionProperties}")
private String connectionProperties; @Bean
public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
return new PropertySourcesPlaceholderConfigurer();
} public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getInitialSize() {
return initialSize;
}
public void setInitialSize(int initialSize) {
this.initialSize = initialSize;
}
public int getMinIdle() {
return minIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public int getMaxActive() {
return maxActive;
}
public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}
public long getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}
public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}
public long getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}
public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public boolean isTestWhileIdle() {
return testWhileIdle;
}
public void setTestWhileIdle(boolean testWhileIdle) {
this.testWhileIdle = testWhileIdle;
}
public boolean isTestOnBorrow() {
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}
public boolean isTestOnReturn() {
return testOnReturn;
}
public void setTestOnReturn(boolean testOnReturn) {
this.testOnReturn = testOnReturn;
}
public boolean isPoolPreparedStatements() {
return poolPreparedStatements;
}
public void setPoolPreparedStatements(boolean poolPreparedStatements) {
this.poolPreparedStatements = poolPreparedStatements;
}
public int getMaxPoolPreparedStatementPerConnectionSize() {
return maxPoolPreparedStatementPerConnectionSize;
}
public void setMaxPoolPreparedStatementPerConnectionSize(
int maxPoolPreparedStatementPerConnectionSize) {
this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
}
public String getFilters() {
return filters;
}
public void setFilters(String filters) {
this.filters = filters;
}
public String getConnectionProperties() {
return connectionProperties;
}
public void setConnectionProperties(String connectionProperties) {
this.connectionProperties = connectionProperties;
}
}
package com.imooc.mq.config.database;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource;
import java.sql.SQLException; import com.alibaba.druid.pool.DruidDataSource;
/**
* @Title: DruidDataSourceConfig
* @Description: Druid数据源初始化
*
* EnableTransactionManagement 开启事务
* @date 2019/1/2214:35
*/ @Configuration
@EnableTransactionManagement
public class DruidDataSourceConfig {
private static Logger logger = LoggerFactory.getLogger(com.imooc.mq.config.database.DruidDataSourceConfig.class);
//注入数据源配置信息
@Autowired
private DruidDataSourceSettings druidSettings; public static String DRIVER_CLASSNAME; @Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigure() {
return new PropertySourcesPlaceholderConfigurer();
} /**
* 创建DataSource
* @return
* @throws SQLException
*/
@Bean
public DataSource dataSource() throws SQLException {
DruidDataSource ds = new DruidDataSource();
ds.setDriverClassName(druidSettings.getDriverClassName());
DRIVER_CLASSNAME = druidSettings.getDriverClassName();
ds.setUrl(druidSettings.getUrl());
ds.setUsername(druidSettings.getUsername());
ds.setPassword(druidSettings.getPassword());
ds.setInitialSize(druidSettings.getInitialSize());
ds.setMinIdle(druidSettings.getMinIdle());
ds.setMaxActive(druidSettings.getMaxActive());
ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
ds.setValidationQuery(druidSettings.getValidationQuery());
ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
ds.setTestOnReturn(druidSettings.isTestOnReturn());
ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
ds.setFilters(druidSettings.getFilters());
ds.setConnectionProperties(druidSettings.getConnectionProperties());
logger.info(" druid datasource config : {} ", ds);
return ds;
} /**
* 开启事务
* @return
* @throws Exception
*/
@Bean
public PlatformTransactionManager transactionManager() throws Exception {
DataSourceTransactionManager txManager = new DataSourceTransactionManager();
txManager.setDataSource(dataSource());
return txManager;
}
}
package com.imooc.mq.config.database;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; /**
* @Title: MybatisDataSourceConfig
* @Description: 整合mybatis和Druid
* @date 2019/1/2214:39
*/
@Configuration
public class MybatisDataSourceConfig {
@Autowired
private DataSource dataSource; @Bean(name="sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean() {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
// 添加XML目录
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
SqlSessionFactory sqlSessionFactory = bean.getObject();
sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE); return sqlSessionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
} @Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.imooc.mq.config.database;

import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Title: MybatisMapperScanerConfig
* @Description: 扫码Mybatis
* @AutoConfigureAfter(MybatisDataSourceConfig.class) 先加载数据源类,再加载该类
* @date 2019/1/2214:43
*/
@Configuration
@AutoConfigureAfter(MybatisDataSourceConfig.class)
public class MybatisMapperScanerConfig {
@Bean
public MapperScannerConfigurer mapperScannerConfigurer() {
MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
mapperScannerConfigurer.setBasePackage("com.imooc.mq.mapper");
return mapperScannerConfigurer;
}
}
package com.imooc.mq.config.task;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @Title: TaskSchedulerConfig
* @Description: 定时任务配置
* @date 2019/1/2214:46
*/
@Configuration
@EnableScheduling //启动定时任务
public class TaskSchedulerConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
} /**
* 定时任务线程池
* @return
*/
@Bean(destroyMethod = "shutdown")
public Executor taskScheduler(){
return Executors.newScheduledThreadPool(100);
}
}
package com.imooc.mq.constant;

/**
* @Title: Constans
* @Description: 常量
* @date 2019/1/2214:50
*/
public class Constans {
/**
* 发送中
*/
public static final String ORDER_SENDING = "0"; /**
* 发送成功
*/
public static final String ORDER_SEND_SUCCESS = "1"; /**
* 发送失败
*/
public static final String ORDER_SEND_FAILURE = "2";
/**
* 分钟超时单位:min
*/
public static final int ORDER_TIMEOUT = 1;
}

相应的mapper接口和mapper.xml文件配置

package com.imooc.mq.mapper;

import com.imooc.mq.entity.BrokerMessageLog;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository; import java.util.Date;
import java.util.List; /**
* @Title: BrokerMessageLogMapper
* @Description: 消息记录接口
* @date 2019/1/2214:45
*/
@Repository
public interface BrokerMessageLogMapper {
/**
* 查询消息状态为0(发送中) 且已经超时的消息集合
* @return
*/
List<BrokerMessageLog> query4StatusAndTimeoutMessage(); /**
* 重新发送统计count发送次数 +1
* @param messageId
* @param updateTime
*/
void update4ReSend(@Param("messageId")String messageId, @Param("updateTime") Date updateTime);
/**
* 更新最终消息发送结果 成功 or 失败
* @param messageId
* @param status
* @param updateTime
*/
void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime); int insertSelective(BrokerMessageLog record);
}
------------------------------------------------------------------
package com.imooc.mq.mapper; import com.imooc.mq.entity.Order;
import org.springframework.stereotype.Repository; /**
* @Title: OrderMapper
* @Description: 订单接口
* @date 2019/1/2214:45
*/
@Repository
public interface OrderMapper {
int insert(Order record);
int deleteByPrimaryKey(Integer id);
int insertSelective(Order record);
Order selectByPrimaryKey(Integer id);
int updateByPrimaryKeySelective(Order record);
int updateByPrimaryKey(Order record);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.BrokerMessageLogMapper" >
<resultMap id="BaseResultMap" type="com.imooc.mq.entity.BrokerMessageLog" >
<id column="message_id" property="messageId" jdbcType="VARCHAR" />
<result column="message" property="message" jdbcType="VARCHAR" />
<result column="try_count" property="tryCount" jdbcType="INTEGER" />
<result column="status" property="status" jdbcType="VARCHAR" />
<result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
<result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
</resultMap>
<sql id="Base_Column_List" >
message_id, message, try_count, status, next_retry, create_time, update_time
</sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
<include refid="Base_Column_List" />
from broker_message_log
where message_id = #{messageId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
delete from broker_message_log
where message_id = #{messageId,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
insert into broker_message_log (message_id, message, try_count,
status, next_retry, create_time,
update_time)
values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR}, #{tryCount,jdbcType=INTEGER},
#{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
#{updateTime,jdbcType=TIMESTAMP})
</insert>
<insert id="insertSelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
insert into broker_message_log
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
message_id,
</if>
<if test="message != null" >
message,
</if>
<if test="tryCount != null" >
try_count,
</if>
<if test="status != null" >
status,
</if>
<if test="nextRetry != null" >
next_retry,
</if>
<if test="createTime != null" >
create_time,
</if>
<if test="updateTime != null" >
update_time,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
#{messageId,jdbcType=VARCHAR},
</if>
<if test="message != null" >
#{message,jdbcType=VARCHAR},
</if>
<if test="tryCount != null" >
#{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
#{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
#{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
#{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
#{updateTime,jdbcType=TIMESTAMP},
</if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
update broker_message_log
<set >
<if test="message != null" >
message = #{message,jdbcType=VARCHAR},
</if>
<if test="tryCount != null" >
try_count = #{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
status = #{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
create_time = #{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
update_time = #{updateTime,jdbcType=TIMESTAMP},
</if>
</set>
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
update broker_message_log
set message = #{message,jdbcType=VARCHAR},
try_count = #{tryCount,jdbcType=INTEGER},
status = #{status,jdbcType=VARCHAR},
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
create_time = #{createTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where message_id = #{messageId,jdbcType=VARCHAR}
</update> <select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap">
<![CDATA[
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message_log bml
where status = '0'
and next_retry <= sysdate()
]]>
</select> <update id="update4ReSend" >
update broker_message_log bml
set bml.try_count = bml.try_count + 1,
bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
where bml.message_id = #{messageId,jdbcType=VARCHAR}
</update> <update id="changeBrokerMessageLogStatus" >
update broker_message_log bml
set bml.status = #{status,jdbcType=VARCHAR},
bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
where bml.message_id = #{messageId,jdbcType=VARCHAR}
</update> </mapper>
-------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.OrderMapper" >
<resultMap id="BaseResultMap" type="com.imooc.mq.entity.Order" >
<id column="id" property="id" jdbcType="INTEGER" />
<result column="name" property="name" jdbcType="VARCHAR" />
<result column="message_id" property="messageId" jdbcType="VARCHAR" />
</resultMap>
<sql id="Example_Where_Clause" >
<where >
<foreach collection="oredCriteria" item="criteria" separator="or" >
<if test="criteria.valid" >
<trim prefix="(" suffix=")" prefixOverrides="and" >
<foreach collection="criteria.criteria" item="criterion" >
<choose >
<when test="criterion.noValue" >
and ${criterion.condition}
</when>
<when test="criterion.singleValue" >
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue" >
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue" >
and ${criterion.condition}
<foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Update_By_Example_Where_Clause" >
<where >
<foreach collection="example.oredCriteria" item="criteria" separator="or" >
<if test="criteria.valid" >
<trim prefix="(" suffix=")" prefixOverrides="and" >
<foreach collection="criteria.criteria" item="criterion" >
<choose >
<when test="criterion.noValue" >
and ${criterion.condition}
</when>
<when test="criterion.singleValue" >
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue" >
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue" >
and ${criterion.condition}
<foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Base_Column_List" >
id, name, message_id
</sql> <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
select
<include refid="Base_Column_List" />
from t_order
where id = #{id,jdbcType=INTEGER}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
delete from t_order
where id = #{id,jdbcType=INTEGER}
</delete> <insert id="insert" parameterType="com.imooc.mq.entity.Order" >
insert into t_order (id, name, message_id
)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{messageId,jdbcType=VARCHAR}
)
</insert>
<insert id="insertSelective" parameterType="com.imooc.mq.entity.Order" >
insert into t_order
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="id != null" >
id,
</if>
<if test="name != null" >
name,
</if>
<if test="messageId != null" >
message_id,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides="," >
<if test="id != null" >
#{id,jdbcType=INTEGER},
</if>
<if test="name != null" >
#{name,jdbcType=VARCHAR},
</if>
<if test="messageId != null" >
#{messageId,jdbcType=VARCHAR},
</if>
</trim>
</insert> <update id="updateByExampleSelective" parameterType="map" >
update t_order
<set >
<if test="record.id != null" >
id = #{record.id,jdbcType=INTEGER},
</if>
<if test="record.name != null" >
name = #{record.name,jdbcType=VARCHAR},
</if>
<if test="record.messageId != null" >
message_id = #{record.messageId,jdbcType=VARCHAR},
</if>
</set>
<if test="_parameter != null" >
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
<update id="updateByExample" parameterType="map" >
update t_order
set id = #{record.id,jdbcType=INTEGER},
name = #{record.name,jdbcType=VARCHAR},
message_id = #{record.messageId,jdbcType=VARCHAR}
<if test="_parameter != null" >
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
<update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.Order" >
update t_order
<set >
<if test="name != null" >
name = #{name,jdbcType=VARCHAR},
</if>
<if test="messageId != null" >
message_id = #{messageId,jdbcType=VARCHAR},
</if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.Order" >
update t_order
set name = #{name,jdbcType=VARCHAR},
message_id = #{messageId,jdbcType=VARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>
package com.imooc.mq.entity;

import java.util.Date;

/**
* @Title: BrokerMessageLog
* @Description: 消息记录
* @date 2019/1/2214:29
*/
public class BrokerMessageLog {
private String messageId; private String message; private Integer tryCount; private String status; private Date nextRetry; private Date createTime; private Date updateTime; public BrokerMessageLog() {
} public BrokerMessageLog(String messageId, String message, Integer tryCount, String status, Date nextRetry, Date createTime, Date updateTime) {
this.messageId = messageId;
this.message = message;
this.tryCount = tryCount;
this.status = status;
this.nextRetry = nextRetry;
this.createTime = createTime;
this.updateTime = updateTime;
} public String getMessageId() {
return messageId;
} public void setMessageId(String messageId) {
this.messageId = messageId;
} public String getMessage() {
return message;
} public void setMessage(String message) {
this.message = message;
} public Integer getTryCount() {
return tryCount;
} public void setTryCount(Integer tryCount) {
this.tryCount = tryCount;
} public String getStatus() {
return status;
} public void setStatus(String status) {
this.status = status;
} public Date getNextRetry() {
return nextRetry;
} public void setNextRetry(Date nextRetry) {
this.nextRetry = nextRetry;
} public Date getCreateTime() {
return createTime;
} public void setCreateTime(Date createTime) {
this.createTime = createTime;
} public Date getUpdateTime() {
return updateTime;
} public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
--------------------------------------------------------------
package com.imooc.mq.entity; import java.io.Serializable; /**
* @Title: Order
* @Description: 订单
* @date 2019/1/2210:18
*/
public class Order implements Serializable {
private String id;
private String name;
//存储消息发送的唯一标识
private String messageId; public Order() {
} public Order(String id, String name, String messageId) {
this.id = id;
this.name = name;
this.messageId = messageId;
} public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getMessageId() {
return messageId;
} public void setMessageId(String messageId) {
this.messageId = messageId;
} }

现在开始按照设计思路写实现代码:

  1、首先我们把最核心了生产者写好,生产者组成有基本的消息投递,和监听

package com.imooc.mq.producer;

import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import java.util.Date; /**
* @Title: RabbitOrderSender
* @Description: 消息发送
* @date 2019/1/2214:52
*/
@Component
public class RabbitOrderSender {
private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class); @Autowired
private RabbitTemplate rabbitTemplate; @Autowired
private BrokerMessageLogMapper brokerMessageLogMapper; /**
* Broker应答后,会调用该方法区获取应答结果
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("correlationData:"+correlationData);
String messageId = correlationData.getId();
if (ack){
//如果返回成功,则进行更新
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date());
}else {
//失败进行操作:根据具体失败原因选择重试或补偿等手段
logger.error("异常处理"+cause);
}
}
}; /**
* 发送消息方法调用: 构建自定义对象消息
* @param order
* @throws Exception
*/
public void sendOrder(Order order) throws Exception {
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange1", "order.ABC", order, correlationData);
}
}

  2、将定时任务逻辑写好

package com.imooc.mq.task;

import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import java.util.Date;
import java.util.List; /**
* @Title: RetryMessageTasker
* @Description: 定时任务
* @date 2019/1/2215:45
*/
@Component
public class RetryMessageTasker {
private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class);
@Autowired
private RabbitOrderSender rabbitOrderSender; @Autowired
private BrokerMessageLogMapper brokerMessageLogMapper; /**
* 定时任务
*/
@Scheduled(initialDelay = 5000, fixedDelay = 10000)
public void reSend(){
logger.info("-----------定时任务开始-----------");
//抽取消息状态为0且已经超时的消息集合
List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
list.forEach(messageLog -> {
//投递三次以上的消息
if(messageLog.getTryCount() >= 3){
//更新失败的消息
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date());
} else {
// 重试投递消息,将重试次数递增
brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(), new Date());
Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);
try {
rabbitOrderSender.sendOrder(reSendOrder);
} catch (Exception e) {
e.printStackTrace();
logger.error("-----------异常处理-----------");
}
}
});
} }

  3、写好消费者的逻辑,直接用上一篇中的消费者代码,修改对应的exchange、queue、路由key就好

package com.imooc.mq.consumer;

import com.imooc.mq.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component; import java.util.Map; /**
* @Title: OrderReceiver
* @Description: 消费
* @date 2019/1/2211:03
*/
@Component
public class OrderReceiver {
/**
* @RabbitListener 消息监听,可配置交换机、队列、路由key
* 该注解会创建队列和交互机 并建立绑定关系
* @RabbitHandler 标识此方法如果有消息过来,消费者要调用这个方法
* @Payload 消息体
* @Headers 消息头
* @param order
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue1",declare = "true"),
exchange = @Exchange(name = "order-exchange1",declare = "true",type = "topic"),
key = "order.ABC"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,
Channel channel) throws Exception{
//消费者操作
System.out.println("------收到消息,开始消费------");
System.out.println("订单ID:"+order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//现在是手动确认消息 ACK
channel.basicAck(deliveryTag,false);
}
}

  4、业务逻辑

package com.imooc.mq.service;

import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.mapper.OrderMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.DateUtils;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import java.util.Date; /**
* @Title: OrderService
* @Description: 业务实现
* @date 2019/1/2215:41
*/
@Service
public class OrderService {
private static Logger logger = LoggerFactory.getLogger(OrderService.class);
@Autowired
private OrderMapper orderMapper; @Autowired
private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired
private RabbitOrderSender rabbitOrderSender; public void createOrder(Order order) {
try {
// 使用当前时间当做订单创建时间(为了模拟一下简化)
Date orderTime = new Date();
// 插入业务数据
orderMapper.insert(order);
// 插入消息记录表数据
BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
// 消息唯一ID
brokerMessageLog.setMessageId(order.getMessageId());
// 保存消息整体 转为JSON 格式存储入库
brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));
// 设置消息状态为0 表示发送中
brokerMessageLog.setStatus("0");
// 设置消息未确认超时时间窗口为 一分钟
brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constans.ORDER_TIMEOUT));
brokerMessageLog.setCreateTime(new Date());
brokerMessageLog.setUpdateTime(new Date());
brokerMessageLogMapper.insertSelective(brokerMessageLog);
// 发送消息
rabbitOrderSender.sendOrder(order);
} catch (Exception e) {
logger.error("订单业务异常{}",e);
}
}
}

  5、测试

 /**
* 测试订单创建
*/
@Test
public void createOrder(){
Order order = new Order();
order.setId("201901228");
order.setName("测试订单");
order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
try {
orderService.createOrder(order);
} catch (Exception e) {
e.printStackTrace();
}
}

  先启动消费者服务、再启动生产者服务让定时任务跑起来,最后启动测试方法。消息被消费成功后,日志记录状态被修改为1。测试消息重投的话需要制造一些异常情况,比如修改消费者端跟exchange,生产者找不到该交互机,拿不到回调,就会重试投递。

SpringBoot整合RabbitMQ-消息可靠性投递的相关教程结束。

《SpringBoot整合RabbitMQ-消息可靠性投递.doc》

下载本文的Word格式文档,以方便收藏与打印。