ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、视频、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
~~~ package kafka import ( "github.com/Shopify/sarama" "sync" "project/library/dao/common" "project/library/dao/config" ) type KafkaMgr struct { Conn *sarama.Consumer Wg *sync.WaitGroup PdConn *sarama.SyncProducer } func InitConsumer(cfg *config.KafkaCfg) *KafkaMgr { conn, err := sarama.NewConsumer(cfg.Addrs, nil) if err != nil { common.SimplePanic("InitConsumer失败", err.Error()) } return &KafkaMgr{Conn: &conn} } func InitProducer(cfg *config.KafkaCfg) *KafkaMgr { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //等待服务器所有副本都保存成功后的响应 config.Producer.Partitioner = sarama.NewRandomPartitioner //随机的分区类型 config.Producer.Return.Successes = true //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. conn, err := sarama.NewSyncProducer(cfg.Addrs, config) if err != nil { common.SimplePanic("InitProducer失败", err.Error()) } return &KafkaMgr{PdConn: &conn} } ~~~