https://blog.csdn.net/qq_45076180/article/details/111561984
---
- Broker: Kafka 集群中的每个服务器称为一个 Broker,负责存储和处理消息
- 分区副本(Replication):* Kafka 通过副本(Replica)机制提供高可用性,每个 Partition 会有多个副本(默认 3 个)。
- 分为 **Leader 副本** 和 **Follower 副本**:
* **Leader 副本**:负责读写操作。
* **Follower 副本**:被动同步 Leader 的数据,在 Leader 挂掉后,Kafka 自动选举新的 Leader
#### **ISR(In-Sync Replica)**
* **ISR**(同步副本集合):所有与 Leader 副本保持同步的 Follower 副本。
* 只有 ISR 副本才有资格成为新的 Leader
> # 消息丢失
- 重试机制: retries 避免因临时故障丢失数据, enable.idempotence=true + 开启生产者幂等性 (`enable.idempotence=true`)
- Kafka **默认不会每次写入都立即刷盘(落盘)**,而是**先将数据写入操作系统的页面缓存(Page Cache)**,然后**再批量刷入磁盘**, 可以通过修改配置调整入磁盘速度
- 生产者发送消息的时候有个确认机制:
- acks=0 : 生产者**不等待** Kafka 确认,直接发送下一条消息
- acks=1 : 只要**Leader 副本**收到消息就返回 ACK,不管 Follower 是否同步
- acks=-1/acks=all : 生产者等待 **Leader 和ISR(同步副本)** 确认消息写入后才继续
- 配合 `min.insync.replicas` 指定了**最少需要多少个 ISR 副本**确认消息写入
- 关闭Unclean 选举:
```
Broker 1(Leader) ---> 最新数据(消息 1、2、3、4、5)
Broker 2(Follower, ISR) ---> 最新数据(消息 1、2、3、4、5)
Broker 3(Follower, 非ISR) ---> 旧数据(消息 1、2、3)
**正常情况**:
* Broker 1 作为 Leader,Broker 2 作为 ISR 复制数据。
* Broker 3 由于同步较慢,被剔除出 ISR(非 ISR)
**如果 Broker 1 和 Broker 2 宕机:**
* **Unclean 选举 = `true`**
* Kafka 允许 Broker 3 成为新的 Leader,但它**缺少消息 4 和 5**,因此数据丢失。
* **Unclean 选举 = `false`**
* Kafka 不会选出新的 Leader,该分区**不可用**,直到 Broker 1 或 Broker 2 重新上线
```
- 数据回滚
| Broker | 旧数据(宕机前) | 重新上线后数据 |
| --- | --- | --- |
| Broker 1 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(删除 4、5,改为同步新 Leader) |
| Broker 2 | **1, 2, 3, 4, 5** | **1, 2, 3, 6, 7**(删除 4、5,改为同步新 Leader) |
| Broker 3(新 Leader) | **1, 2, 3** | **1, 2, 3, 6, 7** |
- 消费者设置 自动提交偏移量: 如果消费失败, 数据就丢失了(解决方案: 设置手动提交)
> # Rebalance 频繁
- 在 Kafka 中,**Rebalance(再平衡)** 是指 **Kafka Consumer Group(消费者组)重新分配分区(Partition)的过程**。
- 消费者加入或离开 Consumer Group
```
session.timeout.ms=45000 # 默认 10s,适当加大避免误判
heartbeat.interval.ms=15000 # 默认 3s,适当加大减少心跳频率
```
> # Kafka 如何在多个分区中保证消息顺序和消息处理效率?
- 需要顺序消费的, 通过设置key放入同一个分区里
### **分区分配机制**
Kafka 通过**Partitioner**来决定消息发送到哪个分区。默认情况下,Kafka 使用以下规则:
1. **如果有 Key**:
* 对 Key 进行哈希(默认使用`murmur2`算法),然后对分区数取模,得到目标分区。
* **公式**:`partition = hash(key) % numPartitions`
* **结果**:相同 Key 的消息会被分配到同一个分区。
2. **如果没有 Key**:
* 使用**轮询策略**(Round Robin)将消息均匀分配到各个分区
---
消息丢失acks=all + min.insync.replicas ≥ 2
消息重复幂等性 (enable.idempotence=true) + 去重(Redis/数据库唯一索引)
数据积压增加消费者 + 批量消费 + 多线程处理
分区不均衡自定义分区策略 + Key 分区
消息顺序单个 Partition 保持顺序 + StickyAssignor
Rebalance 频繁增加 session.timeout.ms + StickyAssignor
事务问题Kafka 事务 API + 幂等性
性能优化批量发送/消费 + Zero Copy + 压缩
- 目录
- 第一例 gRPC使用例子
- 第二例 基于go-micro做服务注册和服务发现
- 第三例 留言板项目源码
- 第四例 聊天室
- 第五例 工具库
- dao
- common
- common.go
- config
- config.go
- gorm
- grom.go
- sqlx
- sqlx.go
- kafka
- kafka.go
- log
- log.go
- log2.go
- redis
- redis.go
- zookeeper
- zookeeper.go
- init
- main.go
- 工具库
- cache
- cfg.go
- redis
- 示例
- database
- cfg.go
- gorm.go
- sql.go
- 示例
- mq
- cfg.go
- kafka_consumer.go
- kafka_producter.go
- 示例
- time
- time.go
- 第六例 原生sql操作
- 第七例 sqlx操作
- 第八例 Redis数据库(gomodule/redigo)
- 第九例 Redis消息队列
- 第十例 Redis集群连接
- 十一例 Zookeeper操作
- 十二例 Kafka操作
- 十三例 NSQ操作
- 十四例 二分查找
- 十五例 交换排序 - 冒泡排序
- 十六例 插入排序 - 直接插入排序
- 十七例 插入排序 - 希尔排序
- 十八例 交换排序 - 快速排序
- 十九例 算法求解应用
- 二十例 pprof性能分析
- 二一例 CPU信息采集
- 二二例 Heap信息采集
- 二三例 Http信息采集
- 二四例 单元测试(功能测试)
- 二五例 基准测试(压力测试/性能测试)
- 二六例 gdb调试
- 二七例 json序列化和反序列化
- 二八例 protobuf序列化和反序列化
- 二九例 包管理工具 go vendor
- 三十例 包管理工具 go mod
- 三一例 zip压缩
- 三二例 交叉编译
- 三三例 线上环境部署
- 三四例 业务:实现固定周期维护
- 三五例 聊天室(精简版)
- 三六例 并发安全字典
- 三七例 导出Excel表格
- 三八例 导出CSV表格
- 三九例 聊天室(高并发)
- 四十例 JWT (Json Web Token)
- 四一例 雪花算法生成 Id
- 四二例 对称加密 AES
- 四三例 非对称加密 RSA
- 四四例 签名算法 SHA1
- 四五例 数据库操作 gorm
- gorm V2
- 四六例 数据库操作 gorm 集合
- 数据库连接和创建表
- 查询 - 分页
- 查询所有数据
- 查询单条数据
- 插入一条或多条数据
- 更新一条或多条数据
- 更新一条或多条数据(有零值)
- 四七例 RSA(MD5WithRSA 算法)签名和验签方式
- 四八例 线上部署脚本
- 四九例 Elasticsearch
- 五十例 对象池
- 五一例 中间库(github.com/wong-winnie/library)
- 五二例 二维码(生成和解析)
- 五三例 回调用例
- 五四例 文件服务器(MINIO)
- 五五例 chm文档转json
- 提取内容页Json
- 将目录索引和内容页混合生成Json
- 目录层级小案例
- 五六例 部署 gogs 代码管理工具
- 五七例 通过命令行操作SVN
- 五八例 根据数据库表生产模型
- 五九例 Trie树
- 六十例 二进制排序
- 六一例 递归+迭代实现无限级分类
- 六二例 Arrow 数据结构
- 简单介绍
- Go 用Arrow数据格式与其它语言交互
- 六三例 LMDB 内存映射型数据库
- 获取指定Key位置
- 六四例 切片数据按字段分类
- 六五例 Xorm 批量插入数据
- 六六例 FlatBuffers 序列化和反序列化
- FlatBuffers 步骤1
- FlatBuffers 步骤2
- 六七例 数据同步
- 增量同步v1
- 全量同步v1
- 定时器
- 六八例 Http请求
- 六九例 Gin + 数据库操作
- 七十例 ClickHouse 列式数据库
- 七一例 用图表展示数据库数据
- 七二例 go:linkname
- 七三例 四舍五入、保留3小数位
- 七四例 判断两个时间戳是否同一天
- 七五例 Gin Http请求
- 七六例 过滤器
- 七七例 Excel 导入导出
- 七八例 小程序向公众号推消息
- 七九列 解析二进制数据
- 例子一
- 例子二
- 八十例 路由转发
- 八一例 协程池(安全执行任务,捕获异常)
- 八二例 切片 slice
- 八三例 集合 map
- 八四例 Redis 六种数据类型
- 八五例 Zstd压缩
- 八六例 提高接口并发量
- 八七例 协程 goroutine 和 通道 channel
- 八七例 Mysql 事务和索引等
- 编写中
- 数据交互
- mysql 索引和事务
- 发请求
- defer
- 其它
- linux
- OAuth2.0 和 JWT
- 其它2
- 其他
- Web3.0 智能合约
- 多人贪吃蛇
- V1
- 客户端
- 服务端
- V2
- 同步方式
- 游戏框架
- deepseek
- k8s
- TRPC
- Kafka
- 加密
- mm
- 技术扩展阅读