使用 Sarama 往 Kafka 发数据
1 下载、安装 Sarama
1
| go get github.com/Shopify/sarama
|
2 Kafka 发数据 Demo
2.1 配置
1 2 3 4
| config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true
|
2.2 构建消息
1 2 3
| msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log")
|
2.3 连接 Kafka
1 2 3 4 5 6
| client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close()
|
2.4 发送消息
1 2 3 4 5 6
| pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
2.5 消费者
在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,
1
| sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning
|

然后运行发送消息的程序,
1 2
| $ go run sarama.go pid:0 offset:0
|
可以在控制台窗口看到消费者接收到了消息,

3 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package main
import ( "fmt"
"github.com/Shopify/sarama" )
func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true
msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log")
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close()
pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
|