0%

tailfile 启动多个 tailTask

tailfile 启动多个 tailTask

1 tailTask structure

1.1 结构体

1
2
3
4
5
6
7
8
9
var (
tailTasks []*tailTask
)

type tailTask struct {
topic string // kafka 话题
logFilePath string // 日志文件路径
tailFile *tail.Tail // tail.Tail 对象
}

1.2 构造函数

1
2
3
4
5
6
7
func newTailTask(topic, logFilePath string, tailFile *tail.Tail) *tailTask {
return &tailTask{
topic: topic,
logFilePath: logFilePath,
tailFile: tailFile,
}
}

1.3 run 方法

1
2
3
4
5
6
7
8
9
10
11
func (tt *tailTask) run() {
for {
line, ok := <-tt.tailFile.Lines
if !ok {
logrus.Errorf("tail read file err, filename: %s", tt.tailFile.Filename)
time.Sleep(time.Second)
continue
}
kafka.DefaultProducerClient.SendToMsgChan(tt.topic, line.Text)
}
}

2 初始化

2.1 etcd 日志配置项

1
2
3
$ etcdctl get collect_conf_key                                         
collect_conf_key
[{"log_file_path":"./logs/web_log","topic":"web_log"}, {"log_file_path":"./logs/shopping_log", "topic":"shopping_log"}]

2.2 tailefile 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Init(collectConfig []common.CollectConfigEntry) (err error) {
config := tail.Config{
ReOpen: true, // 重新打开,如果修改操作不是在日志文件末尾添加
Follow: true, // 持续寻找新行
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 初始光标位置
MustExist: false, // 是否允许文件不存在
Poll: true, // 使用 Poll 监听文件的变化,而不是 inotify
}
for _, configEntry := range collectConfig {
tailFile, err := tail.TailFile(configEntry.LogFilePath, config)
if err != nil {
logrus.Errorf("tail file failed, err:%w", err)
return err
}
newTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, tailFile)
tailTasks = append(tailTasks, newTask)
}
return nil
}
  • 为每个配置项创建一个收集任务,用 Kafka话题、日志文件路径初始化,加入到任务列表中

3 启动任务

1
2
3
4
5
func StartCollect() {
for _, task := range tailTasks {
go task.run()
}
}
  • 为任务列表中的每个 tailTask 开启一个 goroutine,并发收集多个日志文件的内容,发送到对应的 kafka 话题

启动话题 web_log 的控制台消费者,

1
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log"

另外开一个终端窗口,启动话题 shopping_log 的控制台消费者,

1
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "shopping_log" 

编译、运行程序,

1
2
3
4
$ go run main.go 
[{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 10:54:46 Seeked ./logs/web_log - &{Offset:0 Whence:2}
2021/10/02 10:54:46 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}

向日志文件 ./logs/web_log 写入一行数据,

1
echo "Hello, web_log!" >> logs/web_log

查看话题 web_log 的控制台消费者,

1
2
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log"    
Hello, web_log!

向日志文件 ./logs/shopping_log 写入一行数据,

1
$ echo "Hello, shopping_log" >> logs/shopping_log

查看话题 shopping_log 的控制台消费者,

1
2
3
4
5
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "shopping_log"                                          
[2021-10-02 11:02:48,117] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 2 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-10-02 11:02:48,226] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 7 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-10-02 11:02:48,230] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] The following subscribed topics are not assigned to any members: [shopping_log] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Hello, shopping_log