企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
* 消费者通过订阅消息并消费消息 * Offset的管理是基于消费组(group.id)的级别 * <mark>每个Partition只能由同一消费组内的一个Consumer来消费</mark> * <mark>每个Consumer可以消费多个分区</mark> * 消费过的数据仍会保留在Kafka中 * 消费者不能超过分区数量 * 消费模式 * 队列:所有消费者在一个消费组内 * 发布/订阅:所有消费者被分配到不同的消费组 * 提交方式 * 自动提交:自动提交的优点是方便,但是可能会重复处理消息。 * 手动提交:又分为同步提交commitSync和异步提交commitAsync。 ![](https://img.kancloud.cn/33/7d/337dc9a8888daa6e50dc4f1ac1210d2a_1171x851.png) Consumer 负责订阅(消费)主题并处理消息。Consumer 负责维护到 Broker 的 TCP 连接以便获取数据,在一个 Partition 中每一个记录的 Offset 是该记录的 唯一标识,即每一个 Offset 唯一标识当前 Partition 中的一条记录,同时 Offset 也可以标识 Consumer 在 Partition 中的位置(Position)。对 Consumer 来讲,这个 位置有两种含义:Current Offset 和 Committed Offset。 <br/> **1. Current Offset** <mark>保存在 Consumer 客户端中</mark>,它表示 Consumer 希望收到的下 一条消息的序号。它仅仅在 `poll()`方法中使用,例如,Consumer 第一次调用 `poll()` 方法后收到了 20 条消息(offset:0-19),那么 Current Offset 就被设置为 20。这样 Consumer 下一次调用 poll()方法时,Kafka 就知道应该从序号为 20 的消息开始读 取。这样就能够保证每次 Consumer poll 消息时,都能够收到不重复的消息。 <br/> **2. Committed Offset** <mark>保存在 Broker 上</mark>,它表示 Consumer 已经确认消费过的 消息的序号。主要通过 `commitSync` 和 `commitAsync` API 来操作。 例如,如果 Committed Offset 为 0,Consumer 通过 `poll()`方法收到 20 条消息 后,此时 Current Offset 就是 20,经过一系列的逻辑处理后,并没有调用 `consumer.commitAsync()`或 `consumer.commitSync()`来提交 Committed Offset,那么 此时 Committed Offset 依旧是 0,下一次 Consumer 重启后调用 poll()继续从 0 开 始消费。 <br/> 又如,如果一个 Consumer 消费了 5 条消息(`poll` 并且成功 `commitSync`)之 后宕机了,重新启动之后它仍然能够从第 6 条消息开始消费,因为 Committed Offset 已经被 Kafka 记录为 5。 <br/> 可以将多个 Consumer 设置为同一个 Consumer Group,组内的所有 Consumer 协调在一起来消费订阅主题的所有分区。但是<mark>每个分区只能由同一个消费组内的 一个 Consumer 来消费</mark>。很明显,<mark>Consumer Group 的作用是为了实现多个 Consumer 并行消费一个 Topic</mark>。