https://blog.csdn.net/qq_45076180/article/details/111561984 --- - Broker: Kafka 集群中的每个服务器称为一个 Broker,负责存储和处理消息 - 分区副本(Replication):* Kafka 通过副本(Replica)机制提供高可用性,每个 Partition 会有多个副本(默认 3 个)。 - 分为 **Leader 副本** 和 **Follower 副本**: * **Leader 副本**:负责读写操作。 * **Follower 副本**:被动同步 Leader 的数据,在 Leader 挂掉后,Kafka 自动选举新的 Leader #### **ISR(In-Sync Replica)** * **ISR**(同步副本集合):所有与 Leader 副本保持同步的 Follower 副本。 * 只有 ISR 副本才有资格成为新的 Leader > # 消息丢失 - 重试机制: retries 避免因临时故障丢失数据, enable.idempotence=true + 开启生产者幂等性 (`enable.idempotence=true`) - Kafka **默认不会每次写入都立即刷盘(落盘)**,而是**先将数据写入操作系统的页面缓存(Page Cache)**,然后**再批量刷入磁盘**, 可以通过修改配置调整入磁盘速度 - 生产者发送消息的时候有个确认机制: - acks=0 : 生产者**不等待** Kafka 确认,直接发送下一条消息 - acks=1 : 只要**Leader 副本**收到消息就返回 ACK,不管 Follower 是否同步 - acks=-1/acks=all : 生产者等待 **Leader 和ISR(同步副本)** 确认消息写入后才继续 - 配合 `min.insync.replicas` 指定了**最少需要多少个 ISR 副本**确认消息写入 - 关闭Unclean 选举: ``` Broker 1(Leader) ---> 最新数据(消息 1、2、3、4、5) Broker 2(Follower, ISR) ---> 最新数据(消息 1、2、3、4、5) Broker 3(Follower, 非ISR) ---> 旧数据(消息 1、2、3) **正常情况**: * Broker 1 作为 Leader,Broker 2 作为 ISR 复制数据。 * Broker 3 由于同步较慢,被剔除出 ISR(非 ISR) **如果 Broker 1 和 Broker 2 宕机:** * **Unclean 选举 = `true`** * Kafka 允许 Broker 3 成为新的 Leader,但它**缺少消息 4 和 5**,因此数据丢失。 * **Unclean 选举 = `false`** * Kafka 不会选出新的 Leader,该分区**不可用**,直到 Broker 1 或 Broker 2 重新上线 ``` - 数据回滚 | Broker | 旧数据(宕机前) | 重新上线后数据 | | --- | --- | --- | | Broker 1 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(删除 4、5,改为同步新 Leader) | | Broker 2 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(删除 4、5,改为同步新 Leader) | | Broker 3(新 Leader) | **1, 2, 3** | **1, 2, 3, 6, 7** | - 消费者设置 自动提交偏移量: 如果消费失败, 数据就丢失了(解决方案: 设置手动提交) > # Rebalance 频繁 - 在 Kafka 中,**Rebalance(再平衡)** 是指 **Kafka Consumer Group(消费者组)重新分配分区(Partition)的过程**。 - 消费者加入或离开 Consumer Group ``` session.timeout.ms=45000 # 默认 10s,适当加大避免误判 heartbeat.interval.ms=15000 # 默认 3s,适当加大减少心跳频率 ``` > # Kafka 如何在多个分区中保证消息顺序和消息处理效率? - 需要顺序消费的, 通过设置key放入同一个分区里 ### **分区分配机制** Kafka 通过**Partitioner**来决定消息发送到哪个分区。默认情况下,Kafka 使用以下规则: 1. **如果有 Key**: * 对 Key 进行哈希(默认使用`murmur2`算法),然后对分区数取模,得到目标分区。 * **公式**:`partition = hash(key) % numPartitions` * **结果**:相同 Key 的消息会被分配到同一个分区。 2. **如果没有 Key**: * 使用**轮询策略**(Round Robin)将消息均匀分配到各个分区 --- 消息丢失acks=all + min.insync.replicas ≥ 2 消息重复幂等性 (enable.idempotence=true) + 去重(Redis/数据库唯一索引) 数据积压增加消费者 + 批量消费 + 多线程处理 分区不均衡自定义分区策略 + Key 分区 消息顺序单个 Partition 保持顺序 + StickyAssignor Rebalance 频繁增加 session.timeout.ms + StickyAssignor 事务问题Kafka 事务 API + 幂等性 性能优化批量发送/消费 + Zero Copy + 压缩