0%

根据配置文件的变化管理 tailTask

根据配置文件的变化管理 tailTask

1 watch etcd 中的新配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func WatchConfig() {
watchChan := client.Watch(context.Background(), conf.DefaultConfig.ETCD.CollectConfKey)
for wresp := range watchChan {
for _, event := range wresp.Events {
var config []common.CollectConfigEntry
// 如果 key 被删除了,直接把空配置放到 channel 中
if event.Type == clientv3.EventTypeDelete {
tailfile.Manager.PutConfigIntoChan(config)
continue
}
err := json.Unmarshal(event.Kv.Value, &config)
if err != nil {
logrus.Errorf("unmarshal config error: %w", err)
continue
}
tailfile.Manager.PutConfigIntoChan(config)
}
}
}
  • 调用 etcd client 的 Watch 函数,给定要监听的 key,返回一个 channel。对于指定的 key 关联数据的修改事件会被放入这个 channel 中。
  • 配置信息以 json 格式字符串的形式存储,所以需要 unmarshal 得到 config 对象。
  • 调用 tailfile 包 Manager 对象的 PutConfigIntoChan 方法,将 config 对象放入另一个 channel 中。在 tailfile 包内,会监听对这个 channel 的更改,以实现根据配置文件的变化动态管理 tailTask。

2 tailfile 监听配置变化

2.1 tailFileManager 类

1
2
3
4
5
6
7
8
9
type tailFileManager struct {
tileTaskMap map[string]*tailTask
collectConfig []common.CollectConfigEntry
configChan chan []common.CollectConfigEntry
}

func (t *tailFileManager) PutConfigIntoChan(config []common.CollectConfigEntry) {
t.configChan <- config
}
  • 管理器类 tailFileManager 是单例的,成员变量 tailTaskMap 储存全部 tailTask,key 是任务关联的日志文件路径;collectConfig 存放 logagent 的配置信息;configChan 存放从 etcd 拿到的最新配置,配置监听 goroutine 从这个 channel 中获取最新配置。

2.2 通知 tailTask goroutine 结束的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type tailTask struct {
topic string
logFilePath string
tailFile *tail.Tail
ctx context.Context
cancel context.CancelFunc
}

func newTailTask(topic, logFilePath string, tailFile *tail.Tail) *tailTask {
ctx, cancel := context.WithCancel(context.Background())
return &tailTask{
topic: topic,
logFilePath: logFilePath,
tailFile: tailFile,
ctx: ctx,
cancel: cancel,
}
}

func (tt *tailTask) run() {
for {
select {
case <-tt.ctx.Done():
logrus.Infof("tail task (%s) is stopping...", tt.logFilePath)
return
case line, ok := <-tt.tailFile.Lines:
if !ok {
logrus.Errorf("tail read file err, filename: %s", tt.tailFile.Filename)
time.Sleep(time.Second)
continue
}
kafka.DefaultProducerClient.SendToMsgChan(tt.topic, line.Text)
}
}
}
  • tailTask 对象的字段 ctx 为 context.Context 对象,另一字段 cancel 是与 ctx 绑定的 CancelFunc 函数对象,调用 cancel,将向 ctx.Done() 这个 channel 添加数据。
  • 在 run 方法中去监听 ctx.Done() 这个 channel,如果有变化,则结束这个 goroutine,否则尝试从日志文件末尾读取一行数据。

2.3 监听配置变化、动态管理 tailTask

2.3.1 三种情况

  • 原来就有,什么都不干
  • 原来没有,现在有,新建任务
  • 原来有,但现在没有,停掉

2.3.2 监听函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (t *tailFileManager) ConfigListener() {
for config := range t.configChan {
for _, configEntry := range config {
// 原来就有,什么都不干
if _, ok := t.tileTaskMap[configEntry.LogFilePath]; ok {
continue
}
// 原来没有,现在有,新建任务
newTailFile, err := tail.TailFile(configEntry.LogFilePath, defaultTailConfig)
if err != nil {
logrus.Errorf("create new tail task error: %w", err)
continue
}
logrus.Infof("create new tail task for \"%s\"", configEntry.LogFilePath)
tailTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, newTailFile)
t.tileTaskMap[tailTask.logFilePath] = tailTask
go tailTask.run()
}

// 原来有,但现在没有,停掉
pathSet := map[string]struct{}{}
for _, configEntry := range config {
pathSet[configEntry.LogFilePath] = struct{}{}
}

for path, task := range t.tileTaskMap {
if _, ok := pathSet[path]; !ok {
logrus.Infof("tail task (%s) should be stopped", task.logFilePath)
task.cancel()
delete(t.tileTaskMap, path)
}
}

// 更新配置
t.collectConfig = config
}
}

3 效果演示

3.1 初始配置

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}]"             
OK

编译、运行程序,

1
2
3
4
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}

3.2 修改配置,减少要收集的日志文件

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}]"                       
OK

查看程序输出,

1
2
3
4
5
6
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}
INFO[0097] tail task (./logs/shopping_log) should be stopped
INFO[0097] tail task (./logs/shopping_log) is stopping...

3.3 修改配置,增加要收集的日志文件

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}, {\"log_file_path\":\"./logs/live_log\", \"topic\":\"live_log\"}]"              
OK

查看程序输出,

1
2
3
4
5
6
7
8
9
10
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}
INFO[0097] tail task (./logs/shopping_log) should be stopped
INFO[0097] tail task (./logs/shopping_log) is stopping...
INFO[0189] create new tail task for "./logs/shopping_log"
INFO[0189] create new tail task for "./logs/live_log"
2021/10/02 23:53:12 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:53:12 Waiting for ./logs/live_log to appear...