func(t *tailFileManager)ConfigListener() { for config := range t.configChan { for _, configEntry := range config { // 原来就有,什么都不干 if _, ok := t.tileTaskMap[configEntry.LogFilePath]; ok { continue } // 原来没有,现在有,新建任务 newTailFile, err := tail.TailFile(configEntry.LogFilePath, defaultTailConfig) if err != nil { logrus.Errorf("create new tail task error: %w", err) continue } logrus.Infof("create new tail task for \"%s\"", configEntry.LogFilePath) tailTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, newTailFile) t.tileTaskMap[tailTask.logFilePath] = tailTask go tailTask.run() }
// 原来有,但现在没有,停掉 pathSet := map[string]struct{}{} for _, configEntry := range config { pathSet[configEntry.LogFilePath] = struct{}{} }
for path, task := range t.tileTaskMap { if _, ok := pathSet[path]; !ok { logrus.Infof("tail task (%s) should be stopped", task.logFilePath) task.cancel() delete(t.tileTaskMap, path) } }
// 更新配置 t.collectConfig = config } }
3 效果演示
3.1 初始配置
1 2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}]" OK
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}]" OK
查看程序输出,
1 2 3 4 5 6
$ go run main.go initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}] 2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2} 2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2} INFO[0097] tail task (./logs/shopping_log) should be stopped INFO[0097] tail task (./logs/shopping_log) is stopping...
3.3 修改配置,增加要收集的日志文件
1 2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}, {\"log_file_path\":\"./logs/live_log\", \"topic\":\"live_log\"}]" OK
查看程序输出,
1 2 3 4 5 6 7 8 9 10
$ go run main.go initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}] 2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2} 2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2} INFO[0097] tail task (./logs/shopping_log) should be stopped INFO[0097] tail task (./logs/shopping_log) is stopping... INFO[0189] create new tail task for"./logs/shopping_log" INFO[0189] create new tail task for"./logs/live_log" 2021/10/02 23:53:12 Seeked ./logs/shopping_log - &{Offset:0 Whence:2} 2021/10/02 23:53:12 Waiting for ./logs/live_log to appear...
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" Hello, web_log!
向日志文件 ./logs/shopping_log 写入一行数据,
1
$ echo"Hello, shopping_log" >> logs/shopping_log
查看话题 shopping_log 的控制台消费者,
1 2 3 4 5
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "shopping_log" [2021-10-02 11:02:48,117] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 2 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2021-10-02 11:02:48,226] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 7 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2021-10-02 11:02:48,230] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] The following subscribed topics are not assigned to any members: [shopping_log] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Hello, shopping_log
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { logrus.Errorf("failed to create kafka consumer: %w", err) return }
2 获取指定topic下的所有分区列表
1 2 3 4 5 6 7
topicName := "web_log" partitionList, err := consumer.Partitions(topicName) if err != nil { logrus.Errorf("failed to get list of partition: %w", err) return } fmt.Println("partitionList:", partitionList)
var wg sync.WaitGroup for _, partition := range partitionList { partConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetNewest) if err != nil { logrus.Errorf("failed to create consumer for partition %d: %w", partition, err) return }
funcmain() { size := uint(1e5) filter := bloom.NewWithEstimates(size, 0.01) var i uint for i = 0; i < size; i++ { filter.Add([]byte(fmt.Sprintf("%d", i))) }
count := 0 for i = size; i < size+10000; i++ { if filter.Test([]byte(fmt.Sprintf("%d", i))) { count++ } } fmt.Printf("误判次数:%d\n", count) } $ go run ./bloom.go 误判次数:90
$ go run ./singleflight.go 2021/09/30 10:57:01 get key from database 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data 2021/09/30 10:57:01 data
funcmain() { _, _, err := kafka.DefaultProducerClient.SendStringMessage("web_log", "this is a test log") if err != nil { logrus.Errorf("failed to send kafka message to server:%v", err) os.Exit(1) } fmt.Printf("send message to server successfully!\n") }
5 运行效果
在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,
1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning
$ go run ini.go App Mode: development Data Path: /home/git/grafana Server Protocol: http Email Protocol: smtp Port Number: (int) 9999 Enforce Domain: (bool) true $ cat ./my.ini.local # possible values : production, development app_mode = production
[paths] # Path to where grafana can store temp files, sessions, and the sqlite3 db (if that is used) data = /home/git/grafana
[server] # Protocol (http or https) protocol = http # The http port to use http_port = 9999 # Redirect to correct domain if host header does not match domain # Prevents DNS rebinding attacks enforce_domain = true