MQ系列8:数据存储,消息队列的高可用保障

2023-01-04,,,,

MQ系列1:消息中间件执行原理

MQ系列2:消息中间件的技术选型

MQ系列3:RocketMQ 架构分析

MQ系列4:NameServer 原理解析

MQ系列5:RocketMQ消息的发送模式

MQ系列6:消息的消费

MQ系列7:消息通信,追求极致性能

1 介绍

在之前的章节中,我们介绍了消息的发送 和 消息通信 的原理。但是这边有一个比较核心的关键点,那就是如果已经把消息传递给Broker。在Broker在被消费之前,如何保证消息的稳定性,避免消息丢失和数据。

这时候就需要数据持久化数据来进行保障了。

根据之前我们 MQ系列2:消息中间件的技术选型 章节做的分析,RabbitMQ支持 1W+ 级别的吞吐,

Kafka 和 Rocket 支持 10W+ 级别的吞吐,想要实现这么大的吞吐,必须具备一个很强悍的存储功能。下面我们来看看。

2 Broker 存储架构

RocketMQ采用文件存储机制(类似Kafka),即直接在磁盘上使用文件来保存消息,而不是采用Redis或者MySQL之类的持久化工具。

它会把消息存储所属相关的文件存储在ROCKETMQ_HOME下,包含三个部分:

2.1 CommitLog 消息元数据

存储消息的元数据,所有消息都会顺序存入到CommitLog文件中。CommitLog由多个文件组成,每个文件固定大小1G。它有如下特征:

单个文件默认大小为1G
文件名称长度20,保存偏移量,偏移量不够20位的补0。
如第1个文件没有偏移量,则为:00000000000000000000
第2个文件起始偏移量为1073741824(1G=1073740842),则文件名为 00000000001073741824。
第一个1G文件文件写满之后之后转入第2个文件,如此反复,因为是顺序的,所以写入效率较高。

2.2 ConsumeQueue 消息逻辑队列

ConsumeQueue是指存储消息在CommitLog上的索引,一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。它有如下特征:

ComsumeQueue的结构组成共 20 个字节,包含 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode
ConsumeQueue 里只存偏移量信息,内容精悍。加载到内存中,操作效率非常高。
一致性保障,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,在 ConsumeQueue 丢失或者故障时候,数据可快速回复。
因为每个Topic下可能有多个Queueu,所以存储结构为:HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

2.3 IndexFile 索引文件

IndexFile 是一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法,并且这种查找消息的方法不影响发送与消费消息的主流程。它的特征如下:

算法原理:IndexFile 索引文件的底层实现 为 hash 索引,可以对照 Java 的HashMap比较,通过计算 Key 的 hashcode, 取余获得 hash 槽,并通过拉链法解决哈希冲突。
大小限制:IndexFile 以创建时间戳命名,单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。

通过上面的三个部件说明可以了解到,RocketMQ 消息存储结构主要是由 CommitLog,ConsumeQueue,IndexFile 三部分组成的。当我们发送消息的时候,会执行如下过程:

消息格式化成 CommitLog的字段结构,并按照顺序写入到CommitLog 文件中。
Broker会按照 ConsumeQueue 的字段结构的要求创建一条索引记录。
按需创建IndexFile索引文件。

3 存储的执行过程

通过上面我们已经了解到了,Kafka 和 Rocket 均支持 10W+ 级别的吞吐,那么上述的存储结构是如何保持这样的高超性能的呢?

之前的章节我们已经了解到,Broker 启动时同步启动 NettyRemotingServer 进行端口监听,等坐等客户端的连接。
当客户端发送请求时,NettyRemotingServer WorkerGroup 处理可读事件,执行 processRequestCommand 处理来源消息数据。
接收到消息之后就需要存储下来了,DefaultMessageStore对数据进行校验,校验如下,校验完成之后发送存储指令。
Broker 无响应时拒绝消息写入
Broker 角色 为 SLAVE 时也拒绝写入
判断是否支持写入,不支持写入时也拒绝
topic length 小于等于 256 字符,否则拒绝消息写入
消息 length 小于等于 65536 字符,否则拒绝消息写入
PageCache 繁忙时报错误消息,无法写入
DefaultMessageStore 调用 CommitLog.putMessage 存入消息
获取可以写入的 CommitLog 进行写入
CommitLog(每个CommitLog默认1G大小) 对应 MappedFile(程序视角),当有多个 MappedFiled 时,组成 MappedFileQueue。
MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),但并没有通过 fileChannel 直接访问物理 CommitLog 文件,而是映射到一个 MappedByteBuffer,并把序列化后的消息写入这个 ByteBuffer 中,已达到提升执行效率的目的。
最后写入 MappedFile 相对应的 CommitLog 文件中。

4 总结

理解好RabbitMQ 中 Broker 存储的组成要素 CommitLog,ConsumeQueue,IndexFile。
当 Broker 收到消息存储请求时,通过调用 CommitLog 对应的 MappedFile,把消息写入MappedFile的MeppedByteBuffer(内存映射)。

MQ系列8:数据存储,消息队列的高可用保障的相关教程结束。

《MQ系列8:数据存储,消息队列的高可用保障.doc》

下载本文的Word格式文档,以方便收藏与打印。