【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?

2022-11-27,,,,

前景回顾

上一节我们学习了如何实现基于 netty 客服端和服务端的启动。

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?

那么客户端如何调用服务端呢?

我们本节就来一起实现一下。

消费者实现

启动类的调整

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
.addLast(new MqConsumerHandler(invokeService));
}
})
// 这个参数影响的是还没有被accept 取出的连接
.option(ChannelOption.SO_BACKLOG, 128)
// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
.childOption(ChannelOption.SO_KEEPALIVE, true);

这里我们通过指定分隔符解决 netty 粘包问题。

解决 netty 粘包问题

MqConsumerHandler 处理类

MqConsumerHandler 的实现如下,添加对应的业务处理逻辑。

package com.github.houbb.mq.consumer.handler;

/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqConsumerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqConsumerHandler.class); /**
* 调用管理类
* @since 1.0.0
*/
private final IInvokeService invokeService; public MqConsumerHandler(IInvokeService invokeService) {
this.invokeService = invokeService;
} @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes); RpcMessageDto rpcMessageDto = null;
try {
rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
} catch (Exception exception) {
log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
return;
} if (rpcMessageDto.isRequest()) {
MqCommonResp commonResp = this.dispatch(rpcMessageDto, ctx); if(commonResp == null) {
log.debug("当前消息为 null,忽略处理。");
return;
} writeResponse(rpcMessageDto, commonResp, ctx);
} else {
final String traceId = rpcMessageDto.getTraceId(); // 丢弃掉 traceId 为空的信息
if(StringUtil.isBlank(traceId)) {
log.debug("[Server Response] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));
return;
} // 添加消息
invokeService.addResponse(traceId, rpcMessageDto);
}
}
}

rpc 消息体定义

为了统一标准,我们的 rpc 消息体 RpcMessageDto 定义如下:

package com.github.houbb.mq.common.rpc;

/**
* @author binbin.hou
* @since 1.0.0
*/
public class RpcMessageDto implements Serializable { /**
* 请求时间
*/
private long requestTime; /**
* 请求标识
*/
private String traceId; /**
* 方法类型
*/
private String methodType; /**
* 是否为请求消息
*/
private boolean isRequest; private String respCode; private String respMsg; private String json; //getter&setter }

消息分发

对于接收到的消息体 RpcMessageDto,分发逻辑如下:

/**
* 消息的分发
*
* @param rpcMessageDto 入参
* @param ctx 上下文
* @return 结果
*/
private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {
final String methodType = rpcMessageDto.getMethodType();
final String json = rpcMessageDto.getJson();
String channelId = ChannelUtil.getChannelId(ctx);
log.debug("channelId: {} 接收到 method: {} 内容:{}", channelId,
methodType, json); // 消息发送
if(MethodType.P_SEND_MESSAGE.equals(methodType)) {
// 日志输出
log.info("收到服务端消息: {}", json);
// 如果是 broker,应该进行处理化等操作。
MqCommonResp resp = new MqCommonResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return resp;
}
throw new UnsupportedOperationException("暂不支持的方法类型");
}

这里对于接收到的消息,只做一个简单的日志输出,后续将添加对应的业务逻辑处理。

结果回写

收到请求以后,我们需要返回对应的响应。

基于 channel 的回写实现如下:

/**
* 结果写回
*
* @param req 请求
* @param resp 响应
* @param ctx 上下文
*/
private void writeResponse(RpcMessageDto req,
Object resp,
ChannelHandlerContext ctx) {
final String id = ctx.channel().id().asLongText();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
// 响应类消息
rpcMessageDto.setRequest(false);
rpcMessageDto.setTraceId(req.getTraceId());
rpcMessageDto.setMethodType(req.getMethodType());
rpcMessageDto.setRequestTime(System.currentTimeMillis());
String json = JSON.toJSONString(resp);
rpcMessageDto.setJson(json);
// 回写到 client 端
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
ctx.writeAndFlush(byteBuf);
log.debug("[Server] channel {} response {}", id, JSON.toJSON(rpcMessageDto));
}

调用管理类

为了方便管理异步返回的请求结果,我们统一定义了 IInvokeService 类,用于管理请求与响应。

接口

package com.github.houbb.mq.common.support.invoke;

import com.github.houbb.mq.common.rpc.RpcMessageDto;

/**
* 调用服务接口
* @author binbin.hou
* @since 1.0.0
*/
public interface IInvokeService { /**
* 添加请求信息
* @param seqId 序列号
* @param timeoutMills 超时时间
* @return this
* @since 1.0.0
*/
IInvokeService addRequest(final String seqId,
final long timeoutMills); /**
* 放入结果
* @param seqId 唯一标识
* @param rpcResponse 响应结果
* @return this
* @since 1.0.0
*/
IInvokeService addResponse(final String seqId, final RpcMessageDto rpcResponse); /**
* 获取标志信息对应的结果
* @param seqId 序列号
* @return 结果
* @since 1.0.0
*/
RpcMessageDto getResponse(final String seqId); }

实现

实现本身也不难。

package com.github.houbb.mq.common.support.invoke.impl;

/**
* 调用服务接口
* @author binbin.hou
* @since 1.0.0
*/
public class InvokeService implements IInvokeService { private static final Log logger = LogFactory.getLog(InvokeService.class); /**
* 请求序列号 map
* (1)这里后期如果要添加超时检测,可以添加对应的超时时间。
* 可以把这里调整为 map
*
* key: seqId 唯一标识一个请求
* value: 存入该请求最长的有效时间。用于定时删除和超时判断。
* @since 0.0.2
*/
private final ConcurrentHashMap<String, Long> requestMap; /**
* 响应结果
* @since 1.0.0
*/
private final ConcurrentHashMap<String, RpcMessageDto> responseMap; public InvokeService() {
requestMap = new ConcurrentHashMap<>();
responseMap = new ConcurrentHashMap<>(); final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);
} @Override
public IInvokeService addRequest(String seqId, long timeoutMills) {
logger.debug("[Invoke] start add request for seqId: {}, timeoutMills: {}", seqId,
timeoutMills); final long expireTime = System.currentTimeMillis()+timeoutMills;
requestMap.putIfAbsent(seqId, expireTime); return this;
} @Override
public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) {
// 1. 判断是否有效
Long expireTime = this.requestMap.get(seqId);
// 如果为空,可能是这个结果已经超时了,被定时 job 移除之后,响应结果才过来。直接忽略
if(ObjectUtil.isNull(expireTime)) {
return this;
} //2. 判断是否超时
if(System.currentTimeMillis() > expireTime) {
logger.debug("[Invoke] seqId:{} 信息已超时,直接返回超时结果。", seqId);
rpcResponse = RpcMessageDto.timeout();
} // 这里放入之前,可以添加判断。
// 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。
// 通知所有等待方
responseMap.putIfAbsent(seqId, rpcResponse);
logger.debug("[Invoke] 获取结果信息,seqId: {}, rpcResponse: {}", seqId, JSON.toJSON(rpcResponse));
logger.debug("[Invoke] seqId:{} 信息已经放入,通知所有等待方", seqId); // 移除对应的 requestMap
requestMap.remove(seqId);
logger.debug("[Invoke] seqId:{} remove from request map", seqId); // 同步锁
synchronized (this) {
this.notifyAll();
logger.debug("[Invoke] {} notifyAll()", seqId);
} return this;
} @Override
public RpcMessageDto getResponse(String seqId) {
try {
RpcMessageDto rpcResponse = this.responseMap.get(seqId);
if(ObjectUtil.isNotNull(rpcResponse)) {
logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
return rpcResponse;
} // 进入等待
while (rpcResponse == null) {
logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId); // 同步等待锁
synchronized (this) {
this.wait();
} logger.debug("[Invoke] {} wait has notified!", seqId); rpcResponse = this.responseMap.get(seqId);
logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
} return rpcResponse;
} catch (InterruptedException e) {
logger.error("获取响应异常", e);
throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED);
}
} }

这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。

但是这也有一个问题,如果一个请求的响应丢失了怎么办?

总不能一直等待吧。

TimeoutCheckThread 超时检测线程

超时检测线程就可以帮我们处理一些超时未返回的结果。

package com.github.houbb.mq.common.support.invoke.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.mq.common.rpc.RpcMessageDto; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; /**
* 超时检测线程
* @author binbin.hou
* @since 0.0.2
*/
public class TimeoutCheckThread implements Runnable { /**
* 请求信息
* @since 0.0.2
*/
private final ConcurrentHashMap<String, Long> requestMap; /**
* 请求信息
* @since 0.0.2
*/
private final ConcurrentHashMap<String, RpcMessageDto> responseMap; /**
* 新建
* @param requestMap 请求 Map
* @param responseMap 结果 map
* @since 0.0.2
*/
public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,
ConcurrentHashMap<String, RpcMessageDto> responseMap) {
ArgUtil.notNull(requestMap, "requestMap");
this.requestMap = requestMap;
this.responseMap = responseMap;
} @Override
public void run() {
for(Map.Entry<String, Long> entry : requestMap.entrySet()) {
long expireTime = entry.getValue();
long currentTime = System.currentTimeMillis(); if(currentTime > expireTime) {
final String key = entry.getKey();
// 结果设置为超时,从请求 map 中移除
responseMap.putIfAbsent(key, RpcMessageDto.timeout());
requestMap.remove(key);
}
}
} }

处理逻辑就是定时检测,如果超时了,就默认设置结果为超时,并且从请求集合中移除。

消息生产者实现

启动核心类

public class MqProducer extends Thread implements IMqProducer {

    private static final Log log = LogFactory.getLog(MqProducer.class);

    /**
* 分组名称
*/
private final String groupName; /**
* 端口号
*/
private final int port; /**
* 中间人地址
*/
private String brokerAddress = ""; /**
* channel 信息
* @since 0.0.2
*/
private ChannelFuture channelFuture; /**
* 客户端处理 handler
* @since 0.0.2
*/
private ChannelHandler channelHandler; /**
* 调用管理服务
* @since 0.0.2
*/
private final IInvokeService invokeService = new InvokeService(); /**
* 获取响应超时时间
* @since 0.0.2
*/
private long respTimeoutMills = 5000; /**
* 可用标识
* @since 0.0.2
*/
private volatile boolean enableFlag = false; /**
* 粘包处理分隔符
* @since 1.0.0
*/
private String delimiter = DelimiterUtil.DELIMITER; //set 方法 @Override
public synchronized void run() {
// 启动服务端
log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",
groupName, port, brokerAddress); EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
// channel handler
this.initChannelHandler(); // 省略,同以前 // 标识为可用
enableFlag = true;
} catch (Exception e) {
log.error("MQ 生产者启动遇到异常", e);
throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
}
} }

其中初始化 handler 的实现如下:

private void initChannelHandler() {
final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(delimiter); final MqProducerHandler mqProducerHandler = new MqProducerHandler();
mqProducerHandler.setInvokeService(invokeService); // handler 实际上会被多次调用,如果不是 @Shareable,应该每次都重新创建。
ChannelHandler handler = new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
.addLast(mqProducerHandler);
}
};
this.channelHandler = handler;
}

MqProducerHandler 生产者处理逻辑

和消费者处理逻辑类似。

这里最核心的就是添加响应结果:invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);

package com.github.houbb.mq.producer.handler;

/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqProducerHandler extends SimpleChannelInboundHandler { private static final Log log = LogFactory.getLog(MqProducerHandler.class); /**
* 调用管理类
*/
private IInvokeService invokeService; public void setInvokeService(IInvokeService invokeService) {
this.invokeService = invokeService;
} @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes); String text = new String(bytes);
log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx), text); RpcMessageDto rpcMessageDto = null;
try {
rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
} catch (Exception exception) {
log.error("RpcMessageDto json 格式转换异常 {}", JSON.parse(bytes));
return;
} if(rpcMessageDto.isRequest()) {
// 请求类
final String methodType = rpcMessageDto.getMethodType();
final String json = rpcMessageDto.getJson();
} else {
// 丢弃掉 traceId 为空的信息
if(StringUtil.isBlank(rpcMessageDto.getTraceId())) {
log.debug("[Client] response traceId 为空,直接丢弃", JSON.toJSON(rpcMessageDto));
return;
} invokeService.addResponse(rpcMessageDto.getTraceId(), rpcMessageDto);
log.debug("[Client] response is :{}", JSON.toJSON(rpcMessageDto));
}
}
}

消息的发送

关心请求结果的:

public SendResult send(MqMessage mqMessage) {
String messageId = IdHelper.uuid32();
mqMessage.setTraceId(messageId);
mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);
MqCommonResp resp = callServer(mqMessage, MqCommonResp.class);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
return SendResult.of(messageId, SendStatus.SUCCESS);
}
return SendResult.of(messageId, SendStatus.FAILED);
}

不关心请求结果的发送:

public SendResult sendOneWay(MqMessage mqMessage) {
String messageId = IdHelper.uuid32();
mqMessage.setTraceId(messageId);
mqMessage.setMethodType(MethodType.P_SEND_MESSAGE);
this.callServer(mqMessage, null);
return SendResult.of(messageId, SendStatus.SUCCESS);
}

其中 callServer 实现如下:

/**
* 调用服务端
* @param commonReq 通用请求
* @param respClass 类
* @param <T> 泛型
* @param <R> 结果
* @return 结果
* @since 1.0.0
*/
public <T extends MqCommonReq, R extends MqCommonResp> R callServer(T commonReq, Class<R> respClass) {
final String traceId = commonReq.getTraceId();
final long requestTime = System.currentTimeMillis();
RpcMessageDto rpcMessageDto = new RpcMessageDto();
rpcMessageDto.setTraceId(traceId);
rpcMessageDto.setRequestTime(requestTime);
rpcMessageDto.setJson(JSON.toJSONString(commonReq));
rpcMessageDto.setMethodType(commonReq.getMethodType());
rpcMessageDto.setRequest(true);
// 添加调用服务
invokeService.addRequest(traceId, respTimeoutMills); // 遍历 channel
// 关闭当前线程,以获取对应的信息
// 使用序列化的方式
ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
//负载均衡获取 channel
Channel channel = channelFuture.channel();
channel.writeAndFlush(byteBuf);
String channelId = ChannelUtil.getChannelId(channel); log.debug("[Client] channelId {} 发送消息 {}", channelId, JSON.toJSON(rpcMessageDto));
if (respClass == null) {
log.debug("[Client] 当前消息为 one-way 消息,忽略响应");
return null;
} else {
//channelHandler 中获取对应的响应
RpcMessageDto messageDto = invokeService.getResponse(traceId);
if (MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())) {
throw new MqException(MqCommonRespCode.TIMEOUT);
}
String respJson = messageDto.getJson();
return JSON.parseObject(respJson, respClass);
}
}

测试代码

启动消费者

MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();

启动日志如下:

[DEBUG] [2022-04-21 19:55:26.346] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:55:26.369] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:
[INFO] [2022-04-21 19:55:27.845] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口

启动生产者

MqProducer mqProducer = new MqProducer();
mqProducer.start(); //等待启动完成
while (!mqProducer.isEnableFlag()) {
System.out.println("等待初始化完成...");
DateUtil.sleep(100);
} String message = "HELLO MQ!";
MqMessage mqMessage = new MqMessage();
mqMessage.setTopic("TOPIC");
mqMessage.setTags(Arrays.asList("TAGA", "TAGB"));
mqMessage.setPayload(message.getBytes(StandardCharsets.UTF_8)); SendResult sendResult = mqProducer.send(mqMessage);
System.out.println(JSON.toJSON(sendResult));

生产者日志:

[INFO] [2022-04-21 19:56:39.609] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527
...
[DEBUG] [2022-04-21 19:56:39.895] [main] [c.g.h.m.c.s.i.i.InvokeService.addRequest] - [Invoke] start add request for seqId: a70ea2c4325641d6a5b198323228dc24, timeoutMills: 5000
...
[DEBUG] [2022-04-21 19:56:40.282] [main] [c.g.h.m.c.s.i.i.InvokeService.getResponse] - [Invoke] seq a70ea2c4325641d6a5b198323228dc24 对应结果已经获取: com.github.houbb.mq.common.rpc.RpcMessageDto@a8f0b4
...
{"messageId":"a70ea2c4325641d6a5b198323228dc24","status":"SUCCESS"}

消费者日志:

[DEBUG] [2022-04-21 19:56:40.179] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - channelId: 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 接收到 method: P_SEND_MESSAGE 内容:{"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}

[INFO] [2022-04-21 19:56:40.180] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.dispatch] - 收到服务端消息: {"methodType":"P_SEND_MESSAGE","payload":"SEVMTE8gTVEh","tags":["TAGA","TAGB"],"topic":"TOPIC","traceId":"a70ea2c4325641d6a5b198323228dc24"}

[DEBUG] [2022-04-21 19:56:40.234] [nioEventLoopGroup-2-1] [c.g.h.m.c.h.MqConsumerHandler.writeResponse] - [Server] channel 502b73fffec4485c-00003954-00000001-384d194f6233433e-c8246542 response {"requestTime":1650542200182,"traceId":"a70ea2c4325641d6a5b198323228dc24","request":false,"methodType":"P_SEND_MESSAGE","json":"{\"respCode\":\"0000\",\"respMessage\":\"成功\"}"}

可以看到消费者成功的获取到了生产者的消息。

小结

到这里,我们就实现了一个消息生产者调用消费者的实现。

但是你可能会问,这不就是 rpc 吗?

没有解耦。

是的,为了解决耦合问题,我们将在下一节引入 broker 消息的中间人。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc https://github.com/houbb/rpc

【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?的相关教程结束。

《【MQ】java 从零开始实现消息队列 mq-02-如何实现生产者调用消费者?.doc》

下载本文的Word格式文档,以方便收藏与打印。