Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

2022-07-14,,,,

如何实现顺序消息? 需要程序保证发送和消费的是同一个 queue

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的;有時候,我们需要顺序消费一批消息,比如电商系统 订单创建、支付、完成操作,需要順序执行;

rocketmqtemplate给我们提供了sendorderly方法(有多個重载),来实现发送顺序消息;包括以下:

syncsendorderly,发送同步顺序消息;

asyncsendorderly,发送异步顺序消息;

sendonewayorderly,发送单向顺序消息;

一般我们用syncsendorderly方法发送同步顺序消息。

参数一:topic 如果想添加tag,可以使用"topic:tag"的写法

参数二:消息内容

参数三:hashkey 使用此参数选择队列。 例如:orderid,productid…

因为broker会管理多个消息队列,这个hashkey参数,主要用来计算选择队列的,一般可以把订单id,产品id作为参数值;发送到一个队列,这样方便搞顺序队列;以及消费端接收的时候,默认是并发多线程去接收消息。

rocketmqmessagelistener有个属性consumemode,默认是consumemode.concurrently ,我们要改成consumemode.orderly,单线程顺序接收消息;

下面来介绍下 springboot+rockermq 整合实现 顺序消息的发送与消费

一、创建springboot项目添加rockermq依赖

<!--rocketmq依赖-->
<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-spring-boot-starter</artifactid>
    <version>2.2.1</version>
</dependency>

二、配置rocketmq

# 端口
server:
  port: 8083

# 配置 rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生产者
  producer:
    #生产者组名,规定在一个应用里面必须唯一
    group: group1
    #消息发送的超时时间 默认3000ms
    send-message-timeout: 3000
    #消息达到4096字节的时候,消息就会被压缩。默认 4096
    compress-message-body-threshold: 4096
    #最大的消息限制,默认为128k
    max-message-size: 4194304
    #同步消息发送失败重试次数
    retry-times-when-send-failed: 3
    #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
    retry-next-server: true
    #异步消息发送失败重试的次数
    retry-times-when-send-async-failed: 3

三、新建一个controller来做消息发送

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.rocketmqtemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
/**
 * 模拟两个订单发送消息
 *
 * 顺序信息的三种方式:同步、异步、单向
 * syncsendorderly,发送同步顺序消息;
 * asyncsendorderly,发送异步顺序消息;
 * sendonewayorderly,发送单向顺序消息;
 * 一般我们用第一种发送同步顺序消息;
 * @author qzz
 */
@restcontroller
public class rocketmqordercontroller {
    @autowired
    private rocketmqtemplate rocketmqtemplate;
    /**
     * 发送同步顺序消息
     */
    @requestmapping("/testsyncordersend")
    public void testsyncsend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:hashkey 用来计算决定消息发送到哪个消息队列, 一般是订单id,产品id等
        rocketmqtemplate.syncsendorderly("test-topic-orderly","111111创建","111111");
        rocketmqtemplate.syncsendorderly("test-topic-orderly","111111支付","111111");
        rocketmqtemplate.syncsendorderly("test-topic-orderly","111111完成","111111");
        rocketmqtemplate.syncsendorderly("test-topic-orderly","222222创建","222222");
        rocketmqtemplate.syncsendorderly("test-topic-orderly","222222支付","222222");
        rocketmqtemplate.syncsendorderly("test-topic-orderly","222222完成","222222");
    }
}

四、创建消费端监听消息消费消息

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.consumemode;
import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;
import org.apache.rocketmq.spring.core.rocketmqlistener;
import org.springframework.stereotype.service;
/**
 * 消费顺序消息
 * 配置rocketmq监听
 *
 * consumemode.orderly:顺序消费
 * @author qzz
 */
@service
@rocketmqmessagelistener(consumergroup = "test",topic = "test-topic-orderly",consumemode = consumemode.orderly)
public class rocketmqcommonconsumerlistener implements rocketmqlistener<string> {
    @override
    public void onmessage(string s) {
        system.out.println("consumer 顺序消费,收到消息:"+s);
    }
}

五、启动服务测试顺序消息发送与消费

测试成功!

到此这篇关于springboot详细讲解rocketmq实现顺序消息的发送与消费流程的文章就介绍到这了,更多相关springboot顺序消息内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

《Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程.doc》

下载本文的Word格式文档,以方便收藏与打印。