0%

根据配置文件的变化管理 tailTask

1 watch etcd 中的新配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func WatchConfig() {
watchChan := client.Watch(context.Background(), conf.DefaultConfig.ETCD.CollectConfKey)
for wresp := range watchChan {
for _, event := range wresp.Events {
var config []common.CollectConfigEntry
// 如果 key 被删除了,直接把空配置放到 channel 中
if event.Type == clientv3.EventTypeDelete {
tailfile.Manager.PutConfigIntoChan(config)
continue
}
err := json.Unmarshal(event.Kv.Value, &config)
if err != nil {
logrus.Errorf("unmarshal config error: %w", err)
continue
}
tailfile.Manager.PutConfigIntoChan(config)
}
}
}
  • 调用 etcd client 的 Watch 函数,给定要监听的 key,返回一个 channel。对于指定的 key 关联数据的修改事件会被放入这个 channel 中。
  • 配置信息以 json 格式字符串的形式存储,所以需要 unmarshal 得到 config 对象。
  • 调用 tailfile 包 Manager 对象的 PutConfigIntoChan 方法,将 config 对象放入另一个 channel 中。在 tailfile 包内,会监听对这个 channel 的更改,以实现根据配置文件的变化动态管理 tailTask。

2 tailfile 监听配置变化

2.1 tailFileManager 类

1
2
3
4
5
6
7
8
9
type tailFileManager struct {
tileTaskMap map[string]*tailTask
collectConfig []common.CollectConfigEntry
configChan chan []common.CollectConfigEntry
}

func (t *tailFileManager) PutConfigIntoChan(config []common.CollectConfigEntry) {
t.configChan <- config
}
  • 管理器类 tailFileManager 是单例的,成员变量 tailTaskMap 储存全部 tailTask,key 是任务关联的日志文件路径;collectConfig 存放 logagent 的配置信息;configChan 存放从 etcd 拿到的最新配置,配置监听 goroutine 从这个 channel 中获取最新配置。

2.2 通知 tailTask goroutine 结束的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type tailTask struct {
topic string
logFilePath string
tailFile *tail.Tail
ctx context.Context
cancel context.CancelFunc
}

func newTailTask(topic, logFilePath string, tailFile *tail.Tail) *tailTask {
ctx, cancel := context.WithCancel(context.Background())
return &tailTask{
topic: topic,
logFilePath: logFilePath,
tailFile: tailFile,
ctx: ctx,
cancel: cancel,
}
}

func (tt *tailTask) run() {
for {
select {
case <-tt.ctx.Done():
logrus.Infof("tail task (%s) is stopping...", tt.logFilePath)
return
case line, ok := <-tt.tailFile.Lines:
if !ok {
logrus.Errorf("tail read file err, filename: %s", tt.tailFile.Filename)
time.Sleep(time.Second)
continue
}
kafka.DefaultProducerClient.SendToMsgChan(tt.topic, line.Text)
}
}
}
  • tailTask 对象的字段 ctx 为 context.Context 对象,另一字段 cancel 是与 ctx 绑定的 CancelFunc 函数对象,调用 cancel,将向 ctx.Done() 这个 channel 添加数据。
  • 在 run 方法中去监听 ctx.Done() 这个 channel,如果有变化,则结束这个 goroutine,否则尝试从日志文件末尾读取一行数据。

2.3 监听配置变化、动态管理 tailTask

2.3.1 三种情况

  • 原来就有,什么都不干
  • 原来没有,现在有,新建任务
  • 原来有,但现在没有,停掉

2.3.2 监听函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (t *tailFileManager) ConfigListener() {
for config := range t.configChan {
for _, configEntry := range config {
// 原来就有,什么都不干
if _, ok := t.tileTaskMap[configEntry.LogFilePath]; ok {
continue
}
// 原来没有,现在有,新建任务
newTailFile, err := tail.TailFile(configEntry.LogFilePath, defaultTailConfig)
if err != nil {
logrus.Errorf("create new tail task error: %w", err)
continue
}
logrus.Infof("create new tail task for \"%s\"", configEntry.LogFilePath)
tailTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, newTailFile)
t.tileTaskMap[tailTask.logFilePath] = tailTask
go tailTask.run()
}

// 原来有,但现在没有,停掉
pathSet := map[string]struct{}{}
for _, configEntry := range config {
pathSet[configEntry.LogFilePath] = struct{}{}
}

for path, task := range t.tileTaskMap {
if _, ok := pathSet[path]; !ok {
logrus.Infof("tail task (%s) should be stopped", task.logFilePath)
task.cancel()
delete(t.tileTaskMap, path)
}
}

// 更新配置
t.collectConfig = config
}
}

3 效果演示

3.1 初始配置

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}]"             
OK

编译、运行程序,

1
2
3
4
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}

3.2 修改配置,减少要收集的日志文件

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}]"                       
OK

查看程序输出,

1
2
3
4
5
6
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}
INFO[0097] tail task (./logs/shopping_log) should be stopped
INFO[0097] tail task (./logs/shopping_log) is stopping...

3.3 修改配置,增加要收集的日志文件

1
2
$ etcdctl put collect_conf_key "[{\"log_file_path\":\"./logs/web_log\",\"topic\":\"web_log\"}, {\"log_file_path\":\"./logs/shopping_log\", \"topic\":\"shopping_log\"}, {\"log_file_path\":\"./logs/live_log\", \"topic\":\"live_log\"}]"              
OK

查看程序输出,

1
2
3
4
5
6
7
8
9
10
$ go run main.go 
initial config: [{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 23:50:03 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:50:03 Seeked ./logs/web_log - &{Offset:0 Whence:2}
INFO[0097] tail task (./logs/shopping_log) should be stopped
INFO[0097] tail task (./logs/shopping_log) is stopping...
INFO[0189] create new tail task for "./logs/shopping_log"
INFO[0189] create new tail task for "./logs/live_log"
2021/10/02 23:53:12 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}
2021/10/02 23:53:12 Waiting for ./logs/live_log to appear...

tailfile 启动多个 tailTask

1 tailTask structure

1.1 结构体

1
2
3
4
5
6
7
8
9
var (
tailTasks []*tailTask
)

type tailTask struct {
topic string // kafka 话题
logFilePath string // 日志文件路径
tailFile *tail.Tail // tail.Tail 对象
}

1.2 构造函数

1
2
3
4
5
6
7
func newTailTask(topic, logFilePath string, tailFile *tail.Tail) *tailTask {
return &tailTask{
topic: topic,
logFilePath: logFilePath,
tailFile: tailFile,
}
}

1.3 run 方法

1
2
3
4
5
6
7
8
9
10
11
func (tt *tailTask) run() {
for {
line, ok := <-tt.tailFile.Lines
if !ok {
logrus.Errorf("tail read file err, filename: %s", tt.tailFile.Filename)
time.Sleep(time.Second)
continue
}
kafka.DefaultProducerClient.SendToMsgChan(tt.topic, line.Text)
}
}

2 初始化

2.1 etcd 日志配置项

1
2
3
$ etcdctl get collect_conf_key                                         
collect_conf_key
[{"log_file_path":"./logs/web_log","topic":"web_log"}, {"log_file_path":"./logs/shopping_log", "topic":"shopping_log"}]

2.2 tailefile 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Init(collectConfig []common.CollectConfigEntry) (err error) {
config := tail.Config{
ReOpen: true, // 重新打开,如果修改操作不是在日志文件末尾添加
Follow: true, // 持续寻找新行
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 初始光标位置
MustExist: false, // 是否允许文件不存在
Poll: true, // 使用 Poll 监听文件的变化,而不是 inotify
}
for _, configEntry := range collectConfig {
tailFile, err := tail.TailFile(configEntry.LogFilePath, config)
if err != nil {
logrus.Errorf("tail file failed, err:%w", err)
return err
}
newTask := newTailTask(configEntry.Topic, configEntry.LogFilePath, tailFile)
tailTasks = append(tailTasks, newTask)
}
return nil
}
  • 为每个配置项创建一个收集任务,用 Kafka话题、日志文件路径初始化,加入到任务列表中

3 启动任务

1
2
3
4
5
func StartCollect() {
for _, task := range tailTasks {
go task.run()
}
}
  • 为任务列表中的每个 tailTask 开启一个 goroutine,并发收集多个日志文件的内容,发送到对应的 kafka 话题

启动话题 web_log 的控制台消费者,

1
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log"

另外开一个终端窗口,启动话题 shopping_log 的控制台消费者,

1
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "shopping_log" 

编译、运行程序,

1
2
3
4
$ go run main.go 
[{./logs/web_log web_log} {./logs/shopping_log shopping_log}]
2021/10/02 10:54:46 Seeked ./logs/web_log - &{Offset:0 Whence:2}
2021/10/02 10:54:46 Seeked ./logs/shopping_log - &{Offset:0 Whence:2}

向日志文件 ./logs/web_log 写入一行数据,

1
echo "Hello, web_log!" >> logs/web_log

查看话题 web_log 的控制台消费者,

1
2
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log"    
Hello, web_log!

向日志文件 ./logs/shopping_log 写入一行数据,

1
$ echo "Hello, shopping_log" >> logs/shopping_log

查看话题 shopping_log 的控制台消费者,

1
2
3
4
5
$ sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "shopping_log"                                          
[2021-10-02 11:02:48,117] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 2 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-10-02 11:02:48,226] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] Error while fetching metadata with correlation id 7 : {shopping_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-10-02 11:02:48,230] WARN [Consumer clientId=consumer-console-consumer-27830-1, groupId=console-consumer-27830] The following subscribed topics are not assigned to any members: [shopping_log] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Hello, shopping_log

从etcd加载配置项

1 目录结构

1
2
3
4
$ tree etcd/
etcd/
├── etcd_biz.go
└── etcd_conn.go
  • etcd_conn.go 存放 etcd client 初始化、获取/设置/删除、关闭连接相关的代码
  • etcd_biz.go 存放与业务逻辑相关的代码

2 etcd_conn

2.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var (
client *clientv3.Client
)

func Init(endpoints []string) (err error) {
client, err = clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: time.Second * 5,
})
if err != nil {
logrus.Errorf("init etcd client error: %w", err)
return err
}
return nil
}

2.2 获取值

1
2
3
4
5
6
7
8
9
10
11
12
13
func GetValue(key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
resp, err := client.Get(ctx, key)
defer cancel()
if err != nil {
logrus.Errorf("get config from etcd error: %w", err)
return "", err
}
if len(resp.Kvs) == 0 {
return "", fmt.Errorf("no such entry")
}
return string(resp.Kvs[0].Value), nil
}
  • etcd v3 使用gRPC实现远程过程调用,为了确保不产生goroutine泄露(不被使用的goroutine一直占用资源),调用etcd v3 client API时,要传入context.WithTimeout参数,用于通知子goroutine结束,释放资源

2.3 关闭连接

1
2
3
4
5
6
7
8
func Close() error {
err := client.Close()
if err != nil {
logrus.Errorf("close etcd client error: %w", err)
return err
}
return nil
}

3 配置项结构

1
2
3
4
5
6
7
8
var (
CollectConfig []CollectConfigEntry
)

type CollectConfigEntry struct {
LogFilePath string `json:"log_file_path"`
Topic string `json:"topic"`
}
  • 配置信息 json 格式序列化之后以键值对的 value 字符串形式存储在 etcd 中

4 拉取配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
func GetCollectConfig() (config []CollectConfigEntry, err error) {
configStr, err := GetValue(conf.DefaultConfig.ETCD.CollectConfKey)
if err != nil {
logrus.Errorf("get collect config error: %w", err)
return nil, err
}
err = json.Unmarshal([]byte(configStr), &config)
if err != nil {
logrus.Errorf("unmarshal config str error: %w", err)
return nil, err
}
return config, nil
}
  • 全局的 []CollectConfigEntry 结构体数组对象 CollectConfig 用于存储配置信息,从 etcd 获取到配置值是 json 格式的字符串,需要调用 json.Unmarshal 反序列化后放入 CollectConfig 对象中

Sarama Kafka 消费者 Client

1 创建新的消费者

1
2
3
4
5
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
logrus.Errorf("failed to create kafka consumer: %w", err)
return
}

2 获取指定topic下的所有分区列表

1
2
3
4
5
6
7
topicName := "web_log"
partitionList, err := consumer.Partitions(topicName)
if err != nil {
logrus.Errorf("failed to get list of partition: %w", err)
return
}
fmt.Println("partitionList:", partitionList)

3 针对每个分区创建一个对应的分区消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var wg sync.WaitGroup
for _, partition := range partitionList {
partConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetNewest)
if err != nil {
logrus.Errorf("failed to create consumer for partition %d: %w", partition, err)
return
}

defer partConsumer.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1)
go func() {
for msg := range partConsumer.Messages() {
logrus.Infof("Partition:%d Offset:%d Key:%s Value:%s",
msg.Partition, msg.Offset, msg.Key, msg.Value)
}
wg.Done()
}()
}
wg.Wait()

4 效果演示

编译、运行程序,

1
2
$ go run kafka_consumer.go 
partitionList: [0]

启动 kafka_console_producer

1
2
$ kafka-console-producer.sh --broker-list localhost:9092 --topic web_log    
>

使用 kafka_console_producer 向指定topic发送1条消息,

1
2
3
$ kafka-console-producer.sh --broker-list localhost:9092 --topic web_log               
>Hello, world!
>

查看程序输出,

1
2
3
$ go run kafka_consumer.go 
partitionList: [0]
INFO[0266] Partition:0 Offset:8 Key: Value:Hello, world!

go语言操作etcd

etcd的安装、集群部署请参考:v3.5 docs | etcd

1 安装 client

1
go get go.etcd.io/etcd/client/v3

2 连接集群

1
2
3
4
5
6
7
8
9
10
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"}, // 集群各节点ip地址:端口号
DialTimeout: 5 * time.Second, // 连接超时时间
})
if err != nil {
logrus.Errorf("connect to etcd cluster error: %w", err)
return
}
logrus.Info("connect to etcd success")
defer cli.Close()

3 put 和 get 操作

3.1 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "greeting", "Hello, world!")
cancel()
if err != nil {
logrus.Errorf("put to etcd error: %w", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "greeting")
cancel()
if err != nil {
logrus.Errorf("get from etcd error: %w", err)
return
}
for _, pair := range resp.Kvs {
fmt.Printf("%s:%s\n", pair.Key, pair.Value)
}
  • etcd v3 使用gRPC实现远程过程调用,为了确保不产生goroutine泄露(不被使用的goroutine一直占用资源),调用etcd v3 client API时,要传入context.WithTimeout参数,用于通知子goroutine结束,释放资源。

3.2 效果演示

1
2
3
$ go run etcd.go 
INFO[0000] connect to etcd success
greeting:Hello, world!

4 watch 操作

4.1 代码

1
2
3
4
5
6
7
// watch
watchChan := cli.Watch(context.Background(), "greeting")
for wresp := range watchChan {
for _, event := range wresp.Events {
logrus.Infof("Type: %s Key:%s Value:%s\n", event.Type, event.Kv.Key, event.Kv.Value)
}
}
  • 调用Client对象的watch方法,返回1个channel,etcd client对给定的key进行监听,将修改事件记录在channel中
  • 修改事件对象包含:修改类型(更新、删除)、Key、Value 信息

4.2 效果演示

编译、运行程序,

1
2
$ go run etcd.go 
INFO[0000] connect to etcd success

更新操作,

1
$ etcdctl --endpoints=http://localhost:2379 put greeting "Hello, etcd!"                          OK                        

查看程序输出,

1
2
3
$ go run etcd.go 
INFO[0000] connect to etcd success
INFO[0477] Type: PUT Key:greeting Value:Hello, etcd!

删除操作,

1
$ etcdctl --endpoints=http://localhost:2379 del greeting                                         1                       

查看程序输出,

1
2
3
4
$ go run etcd.go 
INFO[0000] connect to etcd success
INFO[0477] Type: PUT Key:greeting Value:Hello, etcd!
INFO[0631] Type: DELETE Key:greeting Value:

收集日志并发送到Kafka

1 初始化 tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var DefaultTailTool *tail.Tail

func init() {
err := Init(conf.DefaultConfig.Collect.LogFilePath)
if err != nil {
logrus.Errorf("init tail err: %w", err)
os.Exit(1)
}
}

func Init(fileName string) (err error) {
config := tail.Config{
ReOpen: true, // 重新打开,如果修改操作不是在日志文件末尾添加
Follow: true, // 持续寻找新行
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 初始光标位置
MustExist: false, // 是否允许文件不存在
Poll: true, // 使用 Poll 监听文件的变化,而不是 inotify
}
DefaultTailTool, err = tail.TailFile(fileName, config)
if err != nil {
logrus.Errorf("tail file failed, err:%w", err)
return err
}
return nil
}

2 收集日志

1
2
3
4
5
6
7
8
9
10
11
func StartCollect() {
for {
line, ok := <-DefaultTailTool.Lines
if !ok {
logrus.Errorf("tail read file err, filename: %s", DefaultTailTool.Filename)
time.Sleep(time.Second)
continue
}
kafka.MsgChan <- &line.Text
}
}
  • 将从日志文件末尾读出的数据存入channel中,Kafka生产者会从同一个channel读取日志消息,并发送给消息队列
  • 这个共享channel是有缓冲的,只有当缓冲区满tail往里放数据,或者缓冲区空Kafka生产者向外拿数据时才会发生阻塞
  • channel的主要作用是解耦流量控制

3 Kafka生产者发送日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type KafkaProducerClient struct {
client sarama.SyncProducer
}

var (
DefaultConfig *sarama.Config
DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{}
// Message Channel
MsgChan = make(chan *string, consts.MsgChanBufferSize)
)

func (kc *KafkaProducerClient) Init(serverAddress []string) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
go MsgListener()
return nil
}

func MsgListener() {
for msg := range MsgChan {
pid, offset, err := DefaultProducerClient.SendStringMessage("web_log", *msg)
if err != nil {
logrus.Errorf("kafka send msg err: %w", err)
return
}
logrus.Info("kafka send successfully, msg: %s, pid: %v, offset: %v", *msg, pid, offset)
}
}
  • 在初始化函数中使用go关键字启动1个goroutine,用于执行函数MsgListener()中的代码段
  • MsgListener()函数不断从管道MsgChan读取日志消息,然后发送给Kafka集群

4 效果演示

启动Kafka控制台消费者

1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning

编译、运行程序,

1
2
$ go run ./main.go 
2021/10/01 00:46:26 Seeked ./logs/web_log - &{Offset:0 Whence:2}

向日志文件末尾添加数据,

1
echo "Hello, world!" >> ./logs/web_log

控制台消费者收到消息并展示,

img

缓存穿透、缓存雪崩、缓存击穿 解决方案分析

1 缓存穿透

  • 正常的读取流程是,先查缓存,如果在缓存中的查不到或者数据过期,那么就到数据库中查询。如果在数据库能查到,则放入缓存,下次就能在缓存中以较快的速度拿到数据。
  • 如果用来查询的key在数据库中根本不存在,则每次查询都会直接打到数据库,因为在上面的读取流程中不对查询为空的结果进行缓存。如果当前 qps 很高,会对数据库造成压力,甚至压垮数据库。

1.1 空值法

1.1.1 基本思路

  • 当在数据库中查不到时,将查询为空的结果进行缓存,并设置过期时间,这样下次就可以直接根据在缓存中拿到的数据,判断是否存在。

1.1.2 缺点

  • 假如每次查询的key都不一样,并且在数据库中均不存在对应的记录,这样空值法就失效了,因为空值法只会对当前key查询为空的结果进行缓存。

1.2 布隆过滤器

1.2.1 原理

  • 布隆过滤器的主要功能就是用来判断某个元素(key)是否在集合中

1.2.1.1 算法

  • 首先需要k个hash函数,每个函数可以把key散列成为1个整数
  • 初始化时,需要一个长度为n比特的数组,每个比特位初始化为0
  • 某个key加入集合时,用k个hash函数计算出k个散列值,并把数组中对应的比特位置为1
  • 判断某个key是否在集合时,用k个hash函数计算出k个散列值,并查询数组中对应的比特位,如果所有的比特位都是1,认为在集合中

1.2.1.2 缺点

  • 算法判断key在集合中时,有一定的概率key其实不在集合中(误判率可以调),但是如果算法判断key不在集合中,则一定不在
  • 无法删除,因为存在hash冲突,不同的key可能经hash运算后映射到相同的整数值

1.2.2 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main


import (
"fmt"

"github.com/bits-and-blooms/bloom/v3"
)


func main() {
size := uint(1e5)
filter := bloom.NewWithEstimates(size, 0.01)
var i uint
for i = 0; i < size; i++ {
filter.Add([]byte(fmt.Sprintf("%d", i)))
}


count := 0
for i = size; i < size+10000; i++ {
if filter.Test([]byte(fmt.Sprintf("%d", i))) {
count++
}
}
fmt.Printf("误判次数:%d\n", count)
}
$ go run ./bloom.go
误判次数:90

查询10000个不在集合中的数,误判次数为90,因此误判率约为1%,和设置值接近。

1.2.3 使用布隆过滤器解决缓存穿透问题

  • 读取时先查布隆过滤器,不存在的key可以100%地判断出来,如果出现误判,查询会被直接打到数据库,但是因为误判率通常设置得很低,所以对服务造成影响微乎其微。

2 缓存雪崩

  • 缓存雪崩,是指在某一个时间段内,缓存集中过期,集中失效(可能是物理原因)

2.1 大量缓存失效

  • 将缓存失效时间分散开,比如在原有的失效时间基础上增加一个随机值,对于热点数据,过期时间尽量设置得长一些,冷门的数据可以相对设置过期时间短一些。

2.2 缓存服务器宕机

  • 使用缓存集群,避免单机宕机造成的缓存雪崩。

3 缓存击穿

缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

3.1 分布式锁

  • 如果key不在缓存中,就加锁去查数据库,然后放入缓存中。这样多个并发的查询请求,只有一个会被打到数据库,其余的请求从缓存中就可以拿到数据。

3.2 singleflight

  • 当大量请求到来时,对于相同的key,函数对象只会被调用一次。也就是说只有一次请求会被打到数据库,其余请求从singleflight内部维护的map中获取查询结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"errors"
"log"
"sync"

"golang.org/x/sync/singleflight"
)

func main() {
var wg sync.WaitGroup
wg.Add(10)

// 模拟10个并发
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
data, err := GetData("key")
if err != nil {
log.Println(err)
return
}
log.Println(data)
}()
}

wg.Wait()
}


var g singleflight.Group


func GetData(key string) (string, error) {
// 先查缓存
data, err := GetDataFromCache(key)
if err == errNotExist {
// 缓存中找不到,查数据库
value, err, _ := g.Do(key, func() (interface{}, error) {
return GetDataFromDB(key)
})
if err != nil {
log.Println(err)
return "", err
}
data = value.(string)
} else if err != nil {
return "", err
}
return data, nil
}

var errNotExist = errors.New("not exists")

func GetDataFromCache(key string) (string, error) {
// 模拟在缓存中查不到的情况
return "", errNotExist
}

func GetDataFromDB(key string) (string, error) {
log.Printf("get %s from database", key)
return "data", nil
}
1
2
3
4
5
6
7
8
9
10
11
12
$ go run ./singleflight.go 
2021/09/30 10:57:01 get key from database
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
2021/09/30 10:57:01 data
  • 上面的代码模拟10个并发的请求,查询相同的key对应的的value,只有一次是去访问数据库,其余都是从singleflight内部维护的map中获取的。

3.3 多级cache

localcache + 分布式缓存,本地缓存失效后优先读分布式缓存,减少回源。

Kafka客户端读取配置文件初始化

1 项目结构

1
2
3
4
5
6
7
8
9
10
11
.
├── conf
│ ├── config.go
│ └── config.ini
├── consts
│ └── kafka.go
├── go.mod
├── go.sum
├── kafka
│ └── kafka_conn.go
├── main.go
  • conf 包存放配置信息
  • consts 包存放项目中用到的常量
  • kafka 包存放 kafka 客户端的一些常用方法

2 读取配置文件

2.1 配置文件内容

1
2
3
4
5
[kafka]
address = 127.0.0.1:9092

[collect]
logfile_path = ~/logs/web_log
  • 配置文件是 ./conf/config.ini,记录了 Kafka 服务器的地址列表,和日志文件的路径。

2.2 配置文件映射到结构体对象

2.2.1 定义配置结构体类型

1
2
3
4
5
6
7
8
9
10
11
12
type Kafka struct {
Address []string `ini:"address"`
}

type Collect struct {
LogFilePath string `ini:"logfile_path"`
}

type Config struct {
*Kafka `ini:"kafka"`
*Collect `ini:"collect"`
}

2.2.2 初始化默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var (
DefaultConfig *Config = &Config{}
)

const (
configFilePath = "./conf/config.ini"
)

func init() {
InitConfig()
}

func InitConfig() error {
cfg, err := ini.Load(configFilePath)
if err != nil {
logrus.Errorf("failed to load config file:%v", err)
return err
}
err = cfg.MapTo(DefaultConfig)
if err != nil {
logrus.Errorf("failed to map config file to struct object:%v", err)
return err
}
return nil
}

3 Kafka客户端初始化、发送消息

3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type KafkaProducerClient struct {
client sarama.SyncProducer
}

var (
DefaultConfig *sarama.Config
DefaultProducerClient *KafkaProducerClient = &KafkaProducerClient{}
)

func init() {
DefaultConfig := sarama.NewConfig()
DefaultConfig.Producer.RequiredAcks = sarama.WaitForAll
DefaultConfig.Producer.Partitioner = sarama.NewRandomPartitioner
DefaultConfig.Producer.Return.Successes = true

DefaultProducerClient.Init(consts.KafkaServerAddress)
}

// 使用默认sarama配置初始化
func (kc *KafkaProducerClient) Init(serverAddress []string) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, DefaultConfig)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
return nil
}

// 指定sarama配置初始化
func (kc *KafkaProducerClient) InitWithConfig(serverAddress []string, config *sarama.Config) (err error) {
kc.client, err = sarama.NewSyncProducer(serverAddress, config)
if err != nil {
logrus.Errorf("failed to init Kafka producer client:%v", err)
return fmt.Errorf("failed to init Kafka producer client:%v", err)
}
return nil
}

3.2 发送消息

1
2
3
4
func (kc *KafkaProducerClient) SendStringMessage(topic, value string) (pid int32, ofset int64, err error) {
message := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(value)}
return DefaultProducerClient.client.SendMessage(message)
}

4 main.go

1
2
3
4
5
6
7
8
func main() {
_, _, err := kafka.DefaultProducerClient.SendStringMessage("web_log", "this is a test log")
if err != nil {
logrus.Errorf("failed to send kafka message to server:%v", err)
os.Exit(1)
}
fmt.Printf("send message to server successfully!\n")
}

5 运行效果

在 Kafka 安装目录下官方提供了控制台消费者,执行脚本,

1
sh $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "web_log" --from-beginning

img

编译、运行程序,

1
2
$ go run .
send message to server successfully!

消费者侧,

img

使用 go-ini 解析配置文件

本文只介绍实际项目开发中最常用到的功能,详细说明请参考《go-ini使用文档》

1 下载、安装 go-ini

1
go get gopkg.in/ini.v1

2 go-ini 解析配置文件 Demo

2.1 配置文件格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# possible values : production, development
app_mode = development

[paths]
# Path to where grafana can store temp files, sessions, and the sqlite3 db (if that is used)
data = /home/git/grafana

[server]
# Protocol (http or https)
protocol = http

# The http port to use
http_port = 9999

# Redirect to correct domain if host header does not match domain
# Prevents DNS rebinding attacks
enforce_domain = true
  • 和大多数配置文件一样,以键值对的形式表示配置信息
  • 当文本中出现一行形如 “[example_section_name]”,表示新建一个分区,之后的配置信息会和这个分区关联,除非另起一个新分区。
  • 文件最开头的部分,如果没有显式地指明分区,则这些配置信息属于默认分区。

2.2 从数据源加载

1
cfg, err := ini.Load("my.ini")

加载指定路径配置文件的数据,并解析。

2.3 获取、写入配置值

2.3.1 读取指定分区下的某个配置项

1
2
3
// 典型读取操作,默认分区可以使用空字符串表示
fmt.Println("App Mode:", cfg.Section("").Key("app_mode").String())
fmt.Println("Data Path:", cfg.Section("paths").Key("data").String())

2.3.2 候选值限制

1
2
3
// 候选值限制的操作
fmt.Println("Server Protocol:", cfg.Section("server").Key("protocol").In("http", []string{"http", "https"}))
fmt.Println("Email Protocol:", cfg.Section("server").Key("protocol").In("smtp", []string{"imap", "smtp"}))

如果读取的值不在候选值列表中,则用默认值替代。

2.2.3 类型转换

1
2
fmt.Printf("Port Number: (%[1]T) %[1]d\n", cfg.Section("server").Key("http_port").MustInt(9999))
fmt.Printf("Enforce Domain: (%[1]T) %[1]v\n", cfg.Section("server").Key("enforce_domain").MustBool(false))

2.2.4 修改配置值

1
cfg.Section("").Key("app_mode").SetValue("production")

2.2.5 保存配置文件

1
cfg.SaveTo("my.ini.local")

调用 SaveTo 方法将配置文件保存到指定路径。

2.4 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"os"

"gopkg.in/ini.v1"
)

func main() {
cfg, err := ini.Load("my.ini")
if err != nil {
fmt.Printf("Fail to read file:%v\n", err)
os.Exit(1)
}

// 典型读取操作,默认分区可以使用空字符串表示
fmt.Println("App Mode:", cfg.Section("").Key("app_mode").String())
fmt.Println("Data Path:", cfg.Section("paths").Key("data").String())

// 候选值限制的操作
fmt.Println("Server Protocol:", cfg.Section("server").Key("protocol").In("http", []string{"http", "https"}))
fmt.Println("Email Protocol:", cfg.Section("server").Key("protocol").In("smtp", []string{"imap", "smtp"}))

// 类型转换
fmt.Printf("Port Number: (%[1]T) %[1]d\n", cfg.Section("server").Key("http_port").MustInt(9999))
fmt.Printf("Enforce Domain: (%[1]T) %[1]v\n", cfg.Section("server").Key("enforce_domain").MustBool(false))

// 修改某个值然后进行保存
cfg.Section("").Key("app_mode").SetValue("production")
cfg.SaveTo("my.ini.local")
}

2.5 执行效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ go run ini.go
App Mode: development
Data Path: /home/git/grafana
Server Protocol: http
Email Protocol: smtp
Port Number: (int) 9999
Enforce Domain: (bool) true
$ cat ./my.ini.local
# possible values : production, development
app_mode = production

[paths]
# Path to where grafana can store temp files, sessions, and the sqlite3 db (if that is used)
data = /home/git/grafana

[server]
# Protocol (http or https)
protocol = http
# The http port to use
http_port = 9999
# Redirect to correct domain if host header does not match domain
# Prevents DNS rebinding attacks
enforce_domain = true

tail 包读取日志文件

1 下载、安装 tail

1
go get github.com/hpcloud/tail

2 tail 读取日志文件 Demo

2.1 配置

2.1.1 初始化 config 对象

1
2
3
4
5
6
7
config := tail.Config{
ReOpen: true, // 重新打开,如果修改操作不是在日志文件末尾添加
Follow: true, // 持续寻找新行
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 初始光标位置
MustExist: false, // 是否允许文件不存在
Poll: true, // 使用 Poll 监听文件的变化,而不是 inotify
}

2.1.2 SeekInfo 结构

1
2
3
4
type SeekInfo struct {        
Offset int64 // 偏移量
Whence int // 搜索起始位置
}
Whence 含义
0 文件开始处
1 当前位置
2 文件末尾

2.2 创建 Tail 对象

1
2
3
4
5
tails, err := tail.TailFile(fileName, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}

日志文件的路径配置信息初始化 Tail 对象。

2.3 读日志

1
2
3
4
5
6
7
8
9
for {
msg, ok := <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(time.Second)
continue
}
fmt.Println("msg:", msg.Text)
}

Tail 对象内部维护了一个 channel,变量名为 Lines,其中储存着 Line 类型的对象,

1
2
3
4
5
type Line struct {        
Text string // 文本
Time time.Time // 时间
Err error // Error from tail
}

2.4 运行效果

2.4.1 运行程序

1
2
$ go run tail.go
2021/09/23 20:04:59 Waiting for ./log_file to appear...

2.4.2 创建日志文件

打开另一个命令行窗口,执行以下命令,

1
touch log_file

看到程序输出,

1
2021/09/23 20:06:46 Seeked ./log_file - &{Offset:0 Whence:2}

2.4.3 向日志文件末尾写入一行

1
echo "this is a test" >> log_file

看到程序输出,

1
msg: this is a test

2.5 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"time"

"github.com/hpcloud/tail"
)

func main() {
fileName := "./log_file"

config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}

tails, err := tail.TailFile(fileName, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}

for {
msg, ok := <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(time.Second)
continue
}
fmt.Println("msg:", msg.Text)
}
}