1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| type KafkaProducerClient struct { client sarama.SyncProducer }
var ( DefaultConfig *sarama.Config DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{} MsgChan = make(chan *string, consts.MsgChanBufferSize) )
func (kc *KafkaProducerClient) Init(serverAddress []string) (err error) { kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig) if err != nil { logrus.Errorf("failed to init Kafka producer client:%v", err) return fmt.Errorf("failed to init Kafka producer client:%v", err) } go MsgListener() return nil }
func MsgListener() { for msg := range MsgChan { pid, offset, err := DefaultProducerClient.SendStringMessage("web_log", *msg) if err != nil { logrus.Errorf("kafka send msg err: %w", err) return } logrus.Info("kafka send successfully, msg: %s, pid: %v, offset: %v", *msg, pid, offset) } }
|