0%

Kafka客户端读取配置文件初始化

Kafka客户端读取配置文件初始化

1 项目结构

1
2
3
4
5
6
7
8
9
10
11
.
├── conf
│ ├── config.go
│ └── config.ini
├── consts
│ └── kafka.go
├── go.mod
├── go.sum
├── kafka
│ └── kafka_conn.go
├── main.go
  • conf 包存放配置信息
  • consts 包存放项目中用到的常量
  • kafka 包存放 kafka 客户端的一些常用方法

2 读取配置文件

2.1 配置文件内容

1
2
3
4
5
[kafka]
address = 127.0.0.1:9092

[collect]
logfile_path = ~/logs/web_log
  • 配置文件是 ./conf/config.ini,记录了 Kafka 服务器的地址列表,和日志文件的路径。

2.2 配置文件映射到结构体对象

2.2.1 定义配置结构体类型

1
2
3
4
5
6
7
8
9
10
11
12
type Kafka struct {
Address []string `ini:"address"`
}

type Collect struct {
LogFilePath string `ini:"logfile_path"`
}

type Config struct {
*Kafka `ini:"kafka"`
*Collect `ini:"collect"`
}

2.2.2 初始化默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var (
DefaultConfig *Config = &Config{}
)

const (
configFilePath = "./conf/config.ini"
)

func init() {
InitConfig()
}

func InitConfig() error {
cfg, err := ini.Load(configFilePath)
if err != nil {
logrus.Errorf("failed to load config file:%v", err)
return err
}
err = cfg.MapTo(DefaultConfig)
if err != nil {
logrus.Errorf("failed to map config file to struct object:%v", err)
return err
}
return nil
}

3 Kafka客户端初始化、发送消息

3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type KafkaProducerClient struct {
client sarama.SyncProducer
}

var (
DefaultConfig *sarama.Config
DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{}
)

func init() {
DefaultConfig := sarama.NewConfig()
DefaultConfig.Producer.RequiredAcks = sarama.WaitForAll
DefaultConfig.Producer.Partitioner = sarama.NewRandomPartitioner
DefaultConfig.Producer.Return.Successes = true

DefaultProducerClient.Init(consts.KafkaServerAddress)
}

// 使用默认sarama配置初始化
func (kc *KafkaProducerClient) Init(serverAddress []string) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
return nil
}

// 指定sarama配置初始化
func (kc *KafkaProducerClient) InitWithConfig(serverAddress []string, config *sarama.Config) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, config)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
return nil
}

3.2 发送消息

1
2
3
4
func (kc *KafkaProducerClient) SendStringMessage(topic, value string) (pid int32, ofset int64, err error) {
message := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(value)}
return DefaultProducerClient.client.SendMessage(message)
}

4 main.go

1
2
3
4
5
6
7
8
func main() {
_, _, err := kafka.DefaultProducerClient.SendStringMessage("web_log", "this is a test log")
if err != nil {
logrus.Errorf("failed to send kafka message to server:%v", err)
os.Exit(1)
}
fmt.Printf("send message to server successfully!\n")
}

5 运行效果

在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,

1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning

img

编译、运行程序,

1
2
$ go run .
send message to server successfully!

消费者侧,

img