LogAgent —— etcd+kafka+zookeeper+go实现实时读取日志发送到kafka,并实现热加载配置读取的日志路径

工具包目录结构:

.
├── conf
│   ├── logAgent.ini
│   └── logAgentConfig.go
├── etcd
│   └── etcd.go
├── kafka
│   └── kafka.go
├── main.go
└── taillog
    ├── taillog.go
    └── taillog_mgr.go

 

logAgent.ini

 1 [kafka]
 2 address=127.0.0.1:9092
 3 chan_max_size=100000
 4 
 5 [etcd]
 6 address=127.0.0.1:2379
 7 timeout=5
 8 collect_log_key=xxx
 9 
10 [taillog]
11 filename="./my.log"

logAgentConf.go

 1 /**
 2  * @Author: Mr.Cheng
 3  * @Description:
 4  * @File: logAgentConfig
 5  * @Version: 1.0.0
 6  * @Date: 2021/12/9 下午8:44
 7  */
 8 
 9 package logAgentConfig
10 
11 type AppConf struct {
12     KafkaConf `ini:"kafka"`
13     EtcdConf  `ini:"etcd"`
14     // TaillogConf `ini:"taillog"`
15 }
16 
17 type KafkaConf struct {
18     Address string `ini:"address"`
19     Size    int    `ini:"chan_max_size"`
20 }
21 
22 type EtcdConf struct {
23     Address string `ini:"address"`
24     Timeout int    `ini:"timeout"`
25     Key     string `ini:"collect_log_key"`
26 }
27 
28 // ----- unused ↓️----
29 
30 type TaillogConf struct {
31     FileName string `ini:"filename"`
32 }

etcd.go

 1 /**
 2  * @Author: Mr.Cheng
 3  * @Description:
 4  * @File: etcd
 5  * @Version: 1.0.0
 6  * @Date: 2021/12/9 下午9:12
 7  */
 8 package etcd
 9 
10 import (
11     "context"
12     "encoding/json"
13     "fmt"
14     "go.etcd.io/etcd/clientv3"
15     "time"
16 )
17 
18 var (
19     client *clientv3.Client
20 )
21 
22 type LogEntry struct {
23     Path  string `json:"path"`  // 日志存放的路径
24     Topic string `json:"topic"` // 日志要发往kafka的topic
25 }
26 
27 func Init(address string, interval int) (err error) {
28     client, err = clientv3.New(clientv3.Config{
29         Endpoints:   []string{address},
30         DialTimeout: time.Duration(interval) * time.Second,
31     })
32     if err != nil {
33         fmt.Printf("connect to etcd failed, err:%v\n", err)
34         return err
35     }
36     return
37 }
38 
39 // 从Etcd中根据Key获取配置项
40 func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
41     ctx, cancel := context.WithTimeout(context.Background(), time.Second)
42     resp, err := client.Get(ctx, key)
43     cancel()
44     if err != nil {
45         fmt.Printf("get from etcd failed, err:%v\n", err)
46         return nil, err
47     }
48     for _, ev := range resp.Kvs {
49         //fmt.Printf("%s:%s\n", ev.Key, ev.Value)
50         err = json.Unmarshal(ev.Value, &LogEntryConf)
51         if err != nil {
52             fmt.Printf("unmarshal etcd value failed, err:%v\n", err)
53             return nil, err
54         }
55     }
56     return LogEntryConf, nil
57 }
58 
59 // etcd watch
60 func WatchConf(key string, newConfChan chan<- []*LogEntry) {
61     ch := client.Watch(context.Background(), key)
62     for wresp := range ch {
63         for _, evt := range wresp.Events {
64             fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
65             var newConf []*LogEntry
66             // 如果是删除操作,json.Unmarshal会报错,需手动添加一个空的newConf
67             if evt.Type != clientv3.EventTypeDelete {
68                 err := json.Unmarshal(evt.Kv.Value, &newConf)
69                 if err != nil {
70                     fmt.Printf("unmarshal new conf failed, err:%v\n", err)
71                     continue
72                 }
73             }
74             newConfChan <- newConf
75         }
76     }
77 }

kafka.go

 1 /**
 2  * @Author: Mr.Cheng
 3  * @Description:往kafka写入日志
 4  * @File: kafka
 5  * @Version: 1.0.0
 6  * @Date: 2021/12/9 下午2:19
 7  */
 8 
 9 package kafka
10 
11 import (
12     "fmt"
13     "github.com/Shopify/sarama"
14     "time"
15 )
16 
17 type logData struct {
18     Topic string
19     Data  string
20 }
21 
22 var (
23     client      sarama.SyncProducer // 全局连接kafka的生产者
24     logDataChan chan *logData
25 )
26 
27 // 初始化连接
28 func Init(address []string, size int) (err error) {
29     config := sarama.NewConfig()
30     config.Producer.RequiredAcks = sarama.WaitForAll          // 发送模式(需leader和follow都确认)
31     config.Producer.Partitioner = sarama.NewRandomPartitioner // 选择分区的方式(轮询)
32     config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel中返回
33 
34     // 连接kafka
35     client, err = sarama.NewSyncProducer(address, config)
36     if err != nil {
37         fmt.Printf("client kafka failed, err:%v\n", err)
38         return err
39     }
40 
41     // 初始化logDataChan
42     logDataChan = make(chan *logData, size)
43 
44     // 从logDataChan中取数据发往kafaka
45     go sendToKafka()
46     return nil
47 }
48 
49 func SendToChan(Topic, Data string) {
50     data := &logData{
51         Topic: Topic,
52         Data:  Data,
53     }
54     select {
55     case logDataChan <- data:
56     default:
57         time.Sleep(time.Millisecond * 100)
58     }
59 }
60 
61 func sendToKafka() {
62     // 循环从通道logDataChan取值并发送给kafka
63     for {
64         select {
65         case data := <-logDataChan:
66             msg := &sarama.ProducerMessage{}
67             msg.Topic = data.Topic
68             msg.Value = sarama.StringEncoder(data.Data)
69             pid, offset, err := client.SendMessage(msg)
70             if err != nil {
71                 fmt.Printf("send msg failed, err:%v\n", err)
72             }
73             fmt.Printf("send msg success, pid:%v offect:%v\n", pid, offset)
74         default:
75             time.Sleep(time.Millisecond * 50)
76         }
77     }
78 }

taillog.go

 1 /**
 2  * @Author: Mr.Cheng
 3  * @Description:收集日志模块
 4  * @File: taillog
 5  * @Version: 1.0.0
 6  * @Date: 2021/12/8 下午9:54
 7  */
 8 
 9 package taillog
10 
11 import (
12     "context"
13     "day21/02.log_agent/kafka"
14     "fmt"
15     "github.com/hpcloud/tail"
16     "time"
17 )
18 
19 type TailTask struct {
20     Path     string
21     Topic    string
22     Instance *tail.Tail
23     // 为了停止任务,存下context
24     ctx    context.Context
25     cancel context.CancelFunc
26 }
27 
28 func NewTailTask(Path, Topic string) (tailtask *TailTask, err error) {
29     config := tail.Config{
30         Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件那个地方开始读
31         ReOpen:    true,                                 // 重新打开
32         MustExist: false,                                // 文件不存在不报错
33         Poll:      true,
34         Follow:    true, // 是否跟随
35     }
36     ctx, cancel := context.WithCancel(context.Background())
37     tailObj, err := tail.TailFile(Path, config)
38     if err != nil {
39         fmt.Printf("tail file failed, err:%v\n", err)
40         return nil, err
41     }
42     tailtask = &TailTask{Path: Path, Topic: Topic, Instance: tailObj, ctx: ctx, cancel: cancel}
43     // 开启读取日志并发送给kafka
44     go tailtask.ReadFromTail()
45     return tailtask, nil
46 }
47 
48 func (tailtask *TailTask) ReadFromTail() {
49     for {
50         select {
51         case <-tailtask.ctx.Done():
52             return
53         case line, ok := <-tailtask.Instance.Lines:
54             if !ok {
55                 fmt.Printf("tail fail close reopen, filename:%s\n", tailtask.Path)
56                 time.Sleep(time.Second)
57                 continue
58             }
59             kafka.SendToChan(tailtask.Topic, line.Text)
60         default:
61             time.Sleep(time.Second)
62         }
63     }
64 }

taillog_mgr.go

  1 /**
  2  * @Author: Mr.Cheng
  3  * @Description:
  4  * @File: taillogMgr
  5  * @Version: 1.0.0
  6  * @Date: 2021/12/14 下午3:47
  7  */
  8 
  9 package taillog
 10 
 11 import (
 12     "day21/02.log_agent/etcd"
 13     "fmt"
 14     "time"
 15 )
 16 
 17 type TailMgr struct {
 18     logEntry    []*etcd.LogEntry
 19     tskMap      map[string]*TailTask
 20     newConfChan chan []*etcd.LogEntry
 21 }
 22 
 23 var tskMgr *TailMgr
 24 
 25 // 循环每一个日志收集项,创建tailObj,并发往kafka
 26 func Init(logEntryConf []*etcd.LogEntry) {
 27     tskMgr = &TailMgr{
 28         logEntry:    logEntryConf,
 29         tskMap:      make(map[string]*TailTask, 16),
 30         newConfChan: make(chan []*etcd.LogEntry),
 31     }
 32 
 33     for _, LogEntry := range logEntryConf {
 34         // fmt.Printf("Path:%v Topic:%v\n", LogEntry.Path, LogEntry.Topic)
 35         tailtask, err := NewTailTask(LogEntry.Path, LogEntry.Topic)
 36         if err != nil {
 37             continue
 38         }
 39         // 在tskMap中存储一下,以便发生配置变更时做增删改操作
 40         key := fmt.Sprintf("%s_%s", tailtask.Path, tailtask.Topic)
 41         tskMgr.tskMap[key] = tailtask
 42     }
 43 
 44     go tskMgr.run()
 45 }
 46 
 47 // 监听newConfChan是否有数据,有数据则表示etcd配置有变化,需做相应的处理
 48 func (t *TailMgr) run() {
 49     for {
 50         select {
 51         case newConf := <- t.newConfChan:
 52             fmt.Printf("配置发生变更,Conf:%v\n", newConf)
 53             // 找出新增项
 54             for _, logEntry := range newConf {
 55                 key := fmt.Sprintf("%s_%s", logEntry.Path, logEntry.Topic)
 56                 _, ok := t.tskMap[key]
 57                 if ok {
 58                     // 表示该配置项原先存在
 59                     continue
 60                 } else {
 61                     // 属于新增配置
 62                     fmt.Printf("新增项,path:%s topic:%s\n", logEntry.Path, logEntry.Topic)
 63                     tailtask, err := NewTailTask(logEntry.Path, logEntry.Topic)
 64                     if err != nil {
 65                         continue
 66                     }
 67                     // TailMgr的logEntry和tskMap增加对应项
 68                     t.logEntry = append(t.logEntry, logEntry)
 69                     t.tskMap[key] = tailtask
 70                     go tailtask.ReadFromTail()
 71                 }
 72             }
 73             // 找出删除项
 74             for index, c1 := range t.logEntry {
 75                 isDelete := true
 76                 for _, c2 := range newConf {
 77                     if c1.Path == c2.Path && c1.Topic == c2.Topic {
 78                         isDelete = false
 79                         break
 80                     }
 81                 }
 82                 if isDelete{
 83                     // 表示属于删除项,从tskMap拿出tailtask对象,执行对象的cancel函数,并将该对象从tskMap中删除
 84                     fmt.Printf("删除项,path:%s topic:%s\n", c1.Path, c1.Topic)
 85                     key := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
 86                     t.tskMap[key].cancel()
 87                     // TailMgr的logEntry和tskMap删除对应项
 88                     delete(t.tskMap, key)
 89                     t.logEntry = append(t.logEntry[:index], t.logEntry[index + 1:]...)
 90                 }
 91             }
 92         default:
 93             time.Sleep(time.Second)
 94         }
 95     }
 96 }
 97 
 98 // 向外暴露newConfChan
 99 func NewConfChan() chan<- []*etcd.LogEntry{
100     return tskMgr.newConfChan
101 }

main.go

 1 /**
 2  * @Author: Mr.Cheng
 3  * @Description:
 4  * @File: main
 5  * @Version: 1.0.0
 6  * @Date: 2021/12/9 下午8:43
 7  */
 8 
 9 package main
10 
11 import (
12     logAgentConfig "day21/02.log_agent/conf"
13     "day21/02.log_agent/etcd"
14     "day21/02.log_agent/kafka"
15     "day21/02.log_agent/taillog"
16     "fmt"
17     "gopkg.in/ini.v1"
18     "sync"
19 )
20 
21 var (
22     cfg = new(logAgentConfig.AppConf)
23     wg  sync.WaitGroup
24 )
25 
26 func main() {
27     // 加载配置文件
28     err := ini.MapTo(cfg, "./conf/logAgent.ini")
29     if err != nil {
30         fmt.Printf("load ini failed, err:%v\n", err)
31         return
32     }
33 
34     // 初始化kafka连接
35     err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Size)
36     if err != nil {
37         return
38     }
39     fmt.Println("init kafka success")
40 
41     // 初始化etcd
42     err = etcd.Init(cfg.EtcdConf.Address, cfg.EtcdConf.Timeout)
43     if err != nil {
44         return
45     }
46     fmt.Println("init etcd success")
47 
48     // 从etcd中获取日志收集项的配置信息
49     logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
50     if err != nil {
51         return
52     }
53     fmt.Printf("get conf from etcd success, conf:%v\n", logEntryConf)
54 
55     // 收集日志发往kafka
56     // 循环每一个日志收集项,创建tailObj,并发往kafka
57     taillog.Init(logEntryConf)
58 
59     // 监视etcd中配置的变动,如有变动,给新的配置信息给taillog
60     wg.Add(1)
61     go etcd.WatchConf(cfg.EtcdConf.Key, taillog.NewConfChan())
62     wg.Wait()
63 }

 


  • 作者:合十
  • 发表时间:2021年12月18日 21:40
  • 更新时间:2024年4月20日 12:50
  • 所属分类:GO语言

Comments

该文章还未收到评论,点击下方评论框开始评论吧~