Kafka客户端读取配置文件初始化 1 项目结构 1 2 3 4 5 6 7 8 9 10 11 . ├── conf │ ├── config.go │ └── config.ini ├── consts │ └── kafka.go ├── go.mod ├── go.sum ├── kafka │ └── kafka_conn.go ├── main.go
conf 包存放配置信息
consts 包存放项目中用到的常量
kafka 包存放 kafka 客户端的一些常用方法
2 读取配置文件 2.1 配置文件内容 1 2 3 4 5 [kafka] address = 127.0.0.1:9092 [collect] logfile_path = ~/logs/web_log
配置文件是 ./conf/config.ini ,记录了 Kafka 服务器的地址列表, 和日志文件的路径。
2.2 配置文件映射到结构体对象 2.2.1 定义配置结构体类型 1 2 3 4 5 6 7 8 9 10 11 12 type Kafka struct { Address []string `ini:"address"` } type Collect struct { LogFilePath string `ini:"logfile_path"` } type Config struct { *Kafka `ini:"kafka"` *Collect `ini:"collect"` }
2.2.2 初始化默认配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 var ( DefaultConfig *Config = &Config{} ) const ( configFilePath = "./conf/config.ini" ) func init () { InitConfig() } func InitConfig () error { cfg, err := ini.Load(configFilePath) if err != nil { logrus.Errorf("failed to load config file:%v" , err) return err } err = cfg.MapTo(DefaultConfig) if err != nil { logrus.Errorf("failed to map config file to struct object:%v" , err) return err } return nil }
3 Kafka客户端初始化、发送消息 3.1 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 type KafkaProducerClient struct { client sarama.SyncProducer } var ( DefaultConfig *sarama.Config DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{} ) func init () { DefaultConfig := sarama.NewConfig() DefaultConfig.Producer.RequiredAcks = sarama.WaitForAll DefaultConfig.Producer.Partitioner = sarama.NewRandomPartitioner DefaultConfig.Producer.Return.Successes = true DefaultProducerClient.Init(consts.KafkaServerAddress) } func (kc *KafkaProducerClient) Init (serverAddress []string ) (err error) { kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig) if err != nil { logrus.Errorf("failed to init Kafka producer client:%v" , err) return fmt.Errorf("failed to init Kafka producer client:%v" , err) } return nil } func (kc *KafkaProducerClient) InitWithConfig (serverAddress []string , config *sarama.Config) (err error) { kc.client, err = sarama.NewSyncProducer(serverAddress, config) if err != nil { logrus.Errorf("failed to init Kafka producer client:%v" , err) return fmt.Errorf("failed to init Kafka producer client:%v" , err) } return nil }
3.2 发送消息 1 2 3 4 func (kc *KafkaProducerClient) SendStringMessage (topic, value string ) (pid int32 , ofset int64 , err error) { message := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(value)} return DefaultProducerClient.client.SendMessage(message) }
4 main.go 1 2 3 4 5 6 7 8 func main () { _, _, err := kafka.DefaultProducerClient.SendStringMessage("web_log" , "this is a test log" ) if err != nil { logrus.Errorf("failed to send kafka message to server:%v" , err) os.Exit(1 ) } fmt.Printf("send message to server successfully!\n" ) }
5 运行效果 在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,
1 sh $KAFKA /bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning
编译、运行程序,
1 2 $ go run . send message to server successfully!
消费者侧,