企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持知识库和私有化部署方案 广告
>[info] Stream Redis Stream 是 Redis 5.0 版本新增加的数据结构。 Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。 简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。 而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。 Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容: ![](https://img.kancloud.cn/a6/a9/a6a93e5913bbcad82e3c3b4ef6044da6_769x514.png) 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。 上图解析: * **Consumer Group**:消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。 * **last_delivered_id**:游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id 往前移动。 * **pending_ids**:消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。 ***** ### **消息队列相关命令:** #### **添加消息到末尾:xadd key id field value [field value ...]** * **key**:队列名称,如果不存在就创建 * **ID**:消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。 * **field value**: 记录。 ``` # 添加 name age sex 三个参数 xadd stream * name wqs age 24 sex 1 ``` ![](https://img.kancloud.cn/1f/0e/1f0e89a2bd287ecd2619b95833e03700_384x60.png) ***** #### **删除消息:xdel key ID [ID ...]** * **key**:队列名称 * **ID**:消息 ID ``` xdel stream 1630057674409-0 ``` ![](https://img.kancloud.cn/f6/e0/f6e0458288d8fbf211798e9f9251f5e9_314x74.png) ***** #### **对流进行修剪,限制长度:xtrim key maxlen [~] count** 使用 xtrim 对流进行修剪,限制长度, 语法格式:**(未发现任何改变)** * **key**:队列名称 * **maxlen**:长度 * **count**:数量 ``` xtrim stream maxlen 2 ``` ![](https://img.kancloud.cn/25/d3/25d3d397220c2e12445e9cd8ef39b63d_299x154.png) ***** #### **消息队列长度:xlen key** ``` xlen stream ``` ![](https://img.kancloud.cn/e5/72/e572f29c4b5d9ec2b3da23406bb7e5b3_318x252.png) ***** #### **获取消息列表,会自动过滤已经删除的消息:range key start end [COUNT count]** * **key**:队列名 * **start**:开始值,- 表示最小值 * **end**:结束值,+ 表示最大值 * **count**:数量 ``` xrange stream - + # 只获取1个 xrange stream - + count 1 ``` ![](https://img.kancloud.cn/43/84/4384e5832f3804d7b943c3dc8e8a7ef9_275x188.png) ***** #### **反向获取消息列表,ID 从大到小:xrevrange key end start [COUNT count]** * **key**:队列名 * **end**:结束值,+ 表示最大值 * **start**:开始值,- 表示最小值 * **count**:数量 ``` xrevrange stream + - ``` ![](https://img.kancloud.cn/d6/3f/d63f330a20baf26954c81252fd8acd3f_290x183.png) ***** #### **以阻塞或非阻塞方式获取消息列表:xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]** * **count**:数量 * **milliseconds**:可选,阻塞毫秒数,没有设置就是非阻塞模式 * **key**:队列名 * **id**:消息 ID