1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| func (t *tailFileManager) ConfigListener() { for config := range t.configChan { for _, configEntry := range config { if _, ok := t.tileTaskMap[configEntry.LogFilePath]; ok { continue } newTailFile, err := tail.TailFile(configEntry.LogFilePath, defaultTailConfig) if err != nil { logrus.Errorf("create new tail task error: %w", err) continue } logrus.Infof("create new tail task for \"%s\"", configEntry.LogFilePath) tailTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, newTailFile) t.tileTaskMap[tailTask.logFilePath] = tailTask go tailTask.run() }
pathSet := map[string]struct{}{} for _, configEntry := range config { pathSet[configEntry.LogFilePath] = struct{}{} }
for path, task := range t.tileTaskMap { if _, ok := pathSet[path]; !ok { logrus.Infof("tail task (%s) should be stopped", task.logFilePath) task.cancel() delete(t.tileTaskMap, path) } }
t.collectConfig = config } }
|