consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { logrus.Errorf("failed to create kafka consumer: %w", err) return }
2 获取指定topic下的所有分区列表
1 2 3 4 5 6 7
topicName := "web_log" partitionList, err := consumer.Partitions(topicName) if err != nil { logrus.Errorf("failed to get list of partition: %w", err) return } fmt.Println("partitionList:", partitionList)
var wg sync.WaitGroup for _, partition := range partitionList { partConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetNewest) if err != nil { logrus.Errorf("failed to create consumer for partition %d: %w", partition, err) return }