0%

收集日志并发送到Kafka

收集日志并发送到Kafka

1 初始化 tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var DefaultTailTool *tail.Tail

func init() {
err := Init(conf.DefaultConfig.Collect.LogFilePath)
if err != nil {
logrus.Errorf("init tail err: %w", err)
os.Exit(1)
}
}

func Init(fileName string) (err error) {
config := tail.Config{
ReOpen: true, // 重新打开,如果修改操作不是在日志文件末尾添加
Follow: true, // 持续寻找新行
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 初始光标位置
MustExist: false, // 是否允许文件不存在
Poll: true, // 使用 Poll 监听文件的变化,而不是 inotify
}
DefaultTailTool, err = tail.TailFile(fileName, config)
if err != nil {
logrus.Errorf("tail file failed, err:%w", err)
return err
}
return nil
}

2 收集日志

1
2
3
4
5
6
7
8
9
10
11
func StartCollect() {
for {
line, ok := <-DefaultTailTool.Lines
if !ok {
logrus.Errorf("tail read file err, filename: %s", DefaultTailTool.Filename)
time.Sleep(time.Second)
continue
}
kafka.MsgChan <- &line.Text
}
}
  • 将从日志文件末尾读出的数据存入channel中,Kafka生产者会从同一个channel读取日志消息,并发送给消息队列
  • 这个共享channel是有缓冲的,只有当缓冲区满tail往里放数据,或者缓冲区空Kafka生产者向外拿数据时才会发生阻塞
  • channel的主要作用是解耦流量控制

3 Kafka生产者发送日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type KafkaProducerClient struct {
client sarama.SyncProducer
}

var (
DefaultConfig *sarama.Config
DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{}
// Message Channel
MsgChan = make(chan *string, consts.MsgChanBufferSize)
)

func (kc *KafkaProducerClient) Init(serverAddress []string) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
go MsgListener()
return nil
}

func MsgListener() {
for msg := range MsgChan {
pid, offset, err := DefaultProducerClient.SendStringMessage("web_log", *msg)
if err != nil {
logrus.Errorf("kafka send msg err: %w", err)
return
}
logrus.Info("kafka send successfully, msg: %s, pid: %v, offset: %v", *msg, pid, offset)
}
}
  • 在初始化函数中使用go关键字启动1个goroutine,用于执行函数MsgListener()中的代码段
  • MsgListener()函数不断从管道MsgChan读取日志消息,然后发送给Kafka集群

4 效果演示

启动Kafka控制台消费者

1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning

编译、运行程序,

1
2
$ go run ./main.go 
2021/10/01 00:46:26 Seeked ./logs/web_log - &{Offset:0 Whence:2}

向日志文件末尾添加数据,

1
echo "Hello, world!" >> ./logs/web_log

控制台消费者收到消息并展示,

img