0%

使用 Sarama 往 Kafka 发数据

使用 Sarama 往 Kafka 发数据

1 下载、安装 Sarama

1
go get github.com/Shopify/sarama 

2 Kafka 发数据 Demo

2.1 配置

1
2
3
4
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partiton
config.Producer.Return.Successes = true // 成功交付的信息将在 success channel 返回

2.2 构建消息

1
2
3
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log" // 主题
msg.Value = sarama.StringEncoder("this is a test log") // 消息内容

2.3 连接 Kafka

1
2
3
4
5
6
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()

2.4 发送消息

1
2
3
4
5
6
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)

2.5 消费者

在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,

1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning

img

然后运行发送消息的程序,

1
2
$ go run sarama.go
pid:0 offset:0

可以在控制台窗口看到消费者接收到了消息,

img

3 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"

"github.com/Shopify/sarama"
)

func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partiton
config.Producer.Return.Successes = true // 成功交付的信息将在 success channel 返回

// 构建一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")

// 连接Kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()

// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}