合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
~~~ package kafka import ( "github.com/Shopify/sarama" "sync" "project/library/dao/common" "project/library/dao/config" ) type KafkaMgr struct { Conn *sarama.Consumer Wg *sync.WaitGroup PdConn *sarama.SyncProducer } func InitConsumer(cfg *config.KafkaCfg) *KafkaMgr { conn, err := sarama.NewConsumer(cfg.Addrs, nil) if err != nil { common.SimplePanic("InitConsumer失败", err.Error()) } return &KafkaMgr{Conn: &conn} } func InitProducer(cfg *config.KafkaCfg) *KafkaMgr { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //等待服务器所有副本都保存成功后的响应 config.Producer.Partitioner = sarama.NewRandomPartitioner //随机的分区类型 config.Producer.Return.Successes = true //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. conn, err := sarama.NewSyncProducer(cfg.Addrs, config) if err != nil { common.SimplePanic("InitProducer失败", err.Error()) } return &KafkaMgr{PdConn: &conn} } ~~~