Redis 数据类型 Stream

2023-07-11,,

Redis 数据类型 Stream

Redis 常用命令,思维导图 >>>

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Stream有以下特点:

消息ID的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
未完成消息的处理
消息队列监控

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

通过思维导图做一个理解

应用场景

对于插入到队列中的数据,应用于不同的业务模块

举个栗子:笔者所在的企业是一个广告代理商公司,每天 9:00 到 12:00 媒体会生成客户的广告投放数据呢,会陆续的生成。虽然生成的时间点不确定,但是有一点是确定的,只要生成了意味着我可以得到广告主的【消费组1】小时报表,【消费组2】日报表,【消费组3】昨日消费流水。当这个数据检测生成的时候,会吧这个广告主的数据添加到队列中,上文中提到的 3个消费组,会根据不同的需求进行各自业务数据的生成

实战演练

1、XADD,生产消息

其中语法格式为:

XADD key ID field string [field string ...]
"1610517042092-0"
127.0.0.1:6379> xadd message1 * name zhagnsan msg 123 name zhagnsan msg 456
"1610517125092-0"
127.0.0.1:6379> xadd message1 * name lisi msg 789
"1610517140757-0"

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。 ID,最常使用*,表示由Redis生成消息ID

Redis使用毫秒时间戳和序号生成了消息ID。此时,消息队列中就有这么一些消息可用了。

此返回的id格式是:毫米数-序号

如果你用事务进行一次性提交的话,可以看到这里的序号变成  0、1、2、3

2、XREAD,消费消息

语法格式为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
127.0.0.1:6379> xread streams message1 0
1) 1) "message1"
2) 1) 1) "1610517125092-0"
2) 1) "name"
2) "zhagnsan"
3) "msg"
4) "123"
5) "name"
6) "zhagnsan"
7) "msg"
8) "456"
2) 1) "1610517140757-0"
2) 1) "name"
2) "lisi"
3) "msg"
4) "789"

[COUNT count],用于限定获取的消息数量
[BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
ID,用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用$,表示最新的消息ID。(在非阻塞模式下$无意义)

一个典型的阻塞模式用法为:

127.0.0.1:6379> XREAD block 1000 streams message1 $
(nil)
(1.07s)

我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。

3、消费者组模式,consumer group

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,

三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。

也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:

# 生产者生产消息
127.0.0.1:6379> multi
OK
127.0.0.1:6379> xadd mq1 * meessage 1
QUEUED
127.0.0.1:6379> xadd mq1 * meessage 2
QUEUED
127.0.0.1:6379> xadd mq1 * meessage 3
QUEUED
127.0.0.1:6379> xadd mq1 * meessage 4
QUEUED
127.0.0.1:6379> xadd mq1 * meessage 5
QUEUED
127.0.0.1:6379> exec
1) "1610522197116-0"
2) "1610522197116-1"
3) "1610522197116-2"
4) "1610522197116-3"
5) "1610522197116-4" # 创建消费组 group1、group2
127.0.0.1:6379> xgroup create mq1 group1 0
OK
127.0.0.1:6379> xgroup create mq1 group2 0
OK # 消费组开始消费
# --- 消费组 group1 中的消费者 cusA 消费一条数据
127.0.0.1:6379> xreadgroup group group1 cusA count 1 streams mq1 >
1) 1) "mq1"
2) 1) 1) "1610522197116-0"
2) 1) "meessage"
2) "1" # --- 获取消息列表
127.0.0.1:6379> XRANGE mq1 - +
1) 1) "1610522197116-0"
2) 1) "meessage"
2) "1"
2) 1) "1610522197116-1"
2) 1) "meessage"
2) "2"
3) 1) "1610522197116-2"
2) 1) "meessage"
2) "3"
4) 1) "1610522197116-3"
2) 1) "meessage"
2) "4"
5) 1) "1610522197116-4"
2) 1) "meessage"
2) "5" # --- 消费组 group1 中的消费者 cusA 消费一条数据
2) "5"
127.0.0.1:6379> xreadgroup group group1 cusA count 1 streams mq1 >
1) 1) "mq1"
2) 1) 1) "1610522197116-1"
2) 1) "meessage"
2) "2" # --- 消费组 group2 中的消费者cusA 消费一条数据
127.0.0.1:6379> xreadgroup group group2 cusA count 1 streams mq1 >
1) 1) "mq1"
2) 1) 1) "1610522197116-0"
2) 1) "meessage"
2) "1" # --- 消费组 group1 中的消费者 cusB 消费一条数据
127.0.0.1:6379> xreadgroup group group1 cusB count 1 streams mq1 >
1) 1) "mq1"
2) 1) 1) "1610522197116-2"
2) 1) "meessage"
2) "3"

特点:

1、创建了新的消费组之后,该消费组所处游标都是从0开始的

2、每个消费组消费的时候,各个消费组之间互不干扰

为完成的消费数据丢失问题

若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令 XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息

# 获取 group1 的未被消费的信息
127.0.0.1:6379> XPENDING mq1 group1
1) (integer) 3 # 3个已经读取但是没处理
2) "1610522197116-0" # 未被消费起始id
3) "1610522197116-2" # 未被消费结束id
4) 1) 1) "cusA" # greoup1组中的消费者A有2个已读未处理
2) "2"
2) 1) "cusB" # greoup1组中的消费者B有1个已读未处理
2) "1" # 利用 start end count 获取消息的详细信息
127.0.0.1:6379> XPENDING mq1 group1 - + 10
1) 1) "1610522197116-0" #消息ID
2) "cusA" #消费者
3) (integer) 1488851 #从第一次读取到现在过了1488851ms
4) (integer) 1 # 消息被读取了1次
2) 1) "1610522197116-1"
2) "cusA"
3) (integer) 1234483
4) (integer) 1
3) 1) "1610522197116-2"
2) "cusB"
3) (integer) 1082148
4) (integer) 1 # 获取某个消费者的 pending 列表
127.0.0.1:6379> XPENDING mq1 group1 - + 10 cusA
1) 1) "1610522197116-0"
2) "cusA"
3) (integer) 2889119
4) (integer) 1
2) 1) "1610522197116-1"
2) "cusA"
3) (integer) 2634751
4) (integer) 1

每个Pending的消息有4个属性:

    消息ID
    所属消费者
    IDLE,已读取时长
    delivery counter,消息被读取次数

如何标识消息处理完毕?XACK

# 标识总消息列表的第二条消息处理完毕
127.0.0.1:6379> XACK mqq group1 1610522197116-1 #通知消息处理结束,用消息ID标识
(integer) 0 # 然后我查一下 group1 的 penging 信息
127.0.0.1:6379> XPENDING mq1 group1
1) (integer) 3
2) "1610522197116-0"
3) "1610522197116-2"
4) 1) 1) "cusA"
2) "2"
2) 1) "cusB"
2) "1" *可以看到,1610522197116-1 这条信息已经没有了*

如何做消息转移?

# group1 中的 cusA 这条信息(1610522197116-1)有 3176717ms没有被处理
127.0.0.1:6379> XPENDING mq1 group1 - + 10
1) 1) "1610522197116-0"
2) "cusA"
3) (integer) 3431085
4) (integer) 1
2) 1) "1610522197116-1"
2) "cusA"
3) (integer) 3176717
4) (integer) 1
3) 1) "1610522197116-2"
2) "cusB"
3) (integer) 3024382
4) (integer) 1 # 接下来我把消息 1610522197116-1 转移给 group1 中的 cusB
# 转移超过 35 秒的消息 1610522197116-1 到 group1 中的 cusB
127.0.0.1:6379> XCLAIM mq1 group1 cusB 35000 1610522197116-1
1) 1) "1610522197116-1"
2) 1) "meessage"
2) "2" # 查看消费者 group1 看到 1610522197116-1 已经跑到消费者 cusB 中了
127.0.0.1:6379> XPENDING mq1 group1 - + 10
1) 1) "1610522197116-0"
2) "cusA"
3) (integer) 3653453
4) (integer) 1
2) 1) "1610522197116-1"
2) "cusB"
3) (integer) 9656
4) (integer) 2
3) 1) "1610522197116-2"
2) "cusB"
3) (integer) 3246750
4) (integer) 1

坏消息问题,Dead Letter,死信问题

如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

# 先查看 mq1 中的所有数据
127.0.0.1:6379> xrange mq1 - +
1) 1) "1610522197116-0"
2) 1) "meessage"
2) "1"
2) 1) "1610522197116-1"
2) 1) "meessage"
2) "2"
3) 1) "1610522197116-2"
2) 1) "meessage"
2) "3"
4) 1) "1610522197116-3"
2) 1) "meessage"
2) "4"
5) 1) "1610522197116-4"
2) 1) "meessage"
2) "5" # 然后删除 1610522197116-3 这条消息
127.0.0.1:6379> xdel mq1 1610522197116-3
(integer) 1 # 最后查看,发现没有了
127.0.0.1:6379> xrange mq1 - +
1) 1) "1610522197116-0"
2) 1) "meessage"
2) "1"
2) 1) "1610522197116-1"
2) 1) "meessage"
2) "2"
3) 1) "1610522197116-2"
2) 1) "meessage"
2) "3"
4) 1) "1610522197116-4"
2) 1) "meessage"
2) "5"

信息监控,XINFO

查看队列信息

127.0.0.1:6379>  Xinfo stream mq1
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 2
9) "last-generated-id"
10) "1610522197116-4"
11) "first-entry"
12) 1) "1610522197116-0"
2) 1) "meessage"
2) "1"
13) "last-entry"
14) 1) "1610522197116-4"
2) 1) "meessage"
2) "5"

查看消费组信息

127.0.0.1:6379> Xinfo GROUPS mq1
1) 1) "name"
2) "group1"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1610522197116-2"
2) 1) "name"
2) "group2"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 1
7) "last-delivered-id"
8) "1610522197116-0" # 这里看到了之前创建的两个消费组

查看消费组成员信息

127.0.0.1:6379> XINFO CONSUMERS mq1 group1
1) 1) "name"
2) "cusA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1392667
2) 1) "name"
2) "cusB"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 637989 # 消费者 cusB 里面有两个没有被消费

常用命令

去文档自己看吧,今天这个笔记写了好久,累的不行,我去喝口水

Redis 数据类型 Stream的相关教程结束。

《Redis 数据类型 Stream.doc》

下载本文的Word格式文档,以方便收藏与打印。