0%

Sarama Kafka 消费者 Client

Sarama Kafka 消费者 Client

1 创建新的消费者

1
2
3
4
5
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
logrus.Errorf("failed to create kafka consumer: %w", err)
return
}

2 获取指定topic下的所有分区列表

1
2
3
4
5
6
7
topicName := "web_log"
partitionList, err := consumer.Partitions(topicName)
if err != nil {
logrus.Errorf("failed to get list of partition: %w", err)
return
}
fmt.Println("partitionList:", partitionList)

3 针对每个分区创建一个对应的分区消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var wg sync.WaitGroup
for _, partition := range partitionList {
partConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetNewest)
if err != nil {
logrus.Errorf("failed to create consumer for partition %d: %w", partition, err)
return
}

defer partConsumer.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1)
go func() {
for msg := range partConsumer.Messages() {
logrus.Infof("Partition:%d Offset:%d Key:%s Value:%s",
msg.Partition, msg.Offset, msg.Key, msg.Value)
}
wg.Done()
}()
}
wg.Wait()

4 效果演示

编译、运行程序,

1
2
$ go run kafka_consumer.go 
partitionList: [0]

启动 kafka_console_producer

1
2
$ kafka-console-producer.sh --broker-list localhost:9092 --topic web_log    
>

使用 kafka_console_producer 向指定topic发送1条消息,

1
2
3
$ kafka-console-producer.sh --broker-list localhost:9092 --topic web_log               
>Hello, world!
>

查看程序输出,

1
2
3
$ go run kafka_consumer.go 
partitionList: [0]
INFO[0266] Partition:0 Offset:8 Key: Value:Hello, world!