[关闭]
@skyway 2016-07-25T01:46:48.000000Z 字数 12160 阅读 1594

open-falcon-agent

falcon go


监控数据

设计原理

配置文件

组织结构

心跳机制

与HBS、Transfer交互

此处输入图片的描述

调用关系

此处输入图片的描述

代码解读

  1. go cron.InitDataHistory()
  2. // 上报本机状态
  3. cron.ReportAgentStatus()
  4. // 同步插件
  5. cron.SyncMinePlugins()
  6. // 同步监控端口、路径、进程和URL
  7. cron.SyncBuiltinMetrics()
  8. // 后门调试agent,允许执行shell指令的ip列表
  9. cron.SyncTrustableIps()
  10. // 开始数据次采集
  11. cron.Collect()
  12. // 启动dashboard server
  13. go http.Start()
  1. // 判断hbs配置是否正常,正常则上报agent状态
  2. if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
  3. // 根据配置的interval间隔上报信息
  4. go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)
  5. }
  6. func reportAgentStatus(interval time.Duration) {
  7. for {
  8. // 获取hostname, 出错则错误赋值给hostname
  9. hostname, err := g.Hostname()
  10. if err != nil {
  11. hostname = fmt.Sprintf("error:%s", err.Error())
  12. }
  13. // 请求发送信息
  14. req := model.AgentReportRequest{
  15. Hostname: hostname,
  16. IP: g.IP(),
  17. AgentVersion: g.VERSION,
  18. // 通过shell指令获取plugin版本,能否go实现
  19. PluginVersion: g.GetCurrPluginVersion(),
  20. }
  21. var resp model.SimpleRpcResponse
  22. // 调用rpc接口
  23. err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
  24. if err != nil || resp.Code != 0 {
  25. log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
  26. }
  27. time.Sleep(interval)
  28. }
  29. }
  1. func syncMinePlugins() {
  2. var (
  3. timestamp int64 = -1
  4. pluginDirs []string
  5. )
  6. duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
  7. for {
  8. time.Sleep(duration)
  9. hostname, err := g.Hostname()
  10. if err != nil {
  11. continue
  12. }
  13. req := model.AgentHeartbeatRequest{
  14. Hostname: hostname,
  15. }
  16. var resp model.AgentPluginsResponse
  17. // 调用rpc接口,返回plugin
  18. err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
  19. if err != nil {
  20. log.Println("ERROR:", err)
  21. continue
  22. }
  23. // 保证时间顺序正确
  24. if resp.Timestamp <= timestamp {
  25. continue
  26. }
  27. pluginDirs = resp.Plugins
  28. // 存放时间保证最新
  29. timestamp = resp.Timestamp
  30. if g.Config().Debug {
  31. log.Println(&resp)
  32. }
  33. // 无插件则清空plugin
  34. if len(pluginDirs) == 0 {
  35. plugins.ClearAllPlugins()
  36. }
  37. desiredAll := make(map[string]*plugins.Plugin)
  38. // 读取所有plugin
  39. for _, p := range pluginDirs {
  40. underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))
  41. for k, v := range underOneDir {
  42. desiredAll[k] = v
  43. }
  44. }
  45. // 停止不需要的插件,启动增加的插件
  46. plugins.DelNoUsePlugins(desiredAll)
  47. plugins.AddNewPlugins(desiredAll)
  48. }
  49. }
  1. func syncBuiltinMetrics() {
  2. var timestamp int64 = -1
  3. var checksum string = "nil"
  4. duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
  5. for {
  6. time.Sleep(duration)
  7. // 监控端口、目录大小、进程
  8. var ports = []int64{}
  9. var paths = []string{}
  10. var procs = make(map[string]map[int]string)
  11. var urls = make(map[string]string)
  12. hostname, err := g.Hostname()
  13. if err != nil {
  14. continue
  15. }
  16. req := model.AgentHeartbeatRequest{
  17. Hostname: hostname,
  18. Checksum: checksum,
  19. }
  20. var resp model.BuiltinMetricResponse
  21. err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
  22. if err != nil {
  23. log.Println("ERROR:", err)
  24. continue
  25. }
  26. if resp.Timestamp <= timestamp {
  27. continue
  28. }
  29. if resp.Checksum == checksum {
  30. continue
  31. }
  32. timestamp = resp.Timestamp
  33. checksum = resp.Checksum
  34. for _, metric := range resp.Metrics {
  35. if metric.Metric == g.URL_CHECK_HEALTH {
  36. arr := strings.Split(metric.Tags, ",")
  37. if len(arr) != 2 {
  38. continue
  39. }
  40. url := strings.Split(arr[0], "=")
  41. if len(url) != 2 {
  42. continue
  43. }
  44. stime := strings.Split(arr[1], "=")
  45. if len(stime) != 2 {
  46. continue
  47. }
  48. if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
  49. urls[url[1]] = stime[1]
  50. } else {
  51. log.Println("metric ParseInt timeout failed:", err)
  52. }
  53. }
  54. // {metric: net.port.listen, tags: port=22}
  55. if metric.Metric == g.NET_PORT_LISTEN {
  56. arr := strings.Split(metric.Tags, "=")
  57. if len(arr) != 2 {
  58. continue
  59. }
  60. if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
  61. ports = append(ports, port)
  62. } else {
  63. log.Println("metrics ParseInt failed:", err)
  64. }
  65. continue
  66. }
  67. // metric: du.bs tags: path=/home/works/logs
  68. // du -bs /home/works/logs
  69. if metric.Metric == g.DU_BS {
  70. arr := strings.Split(metric.Tags, "=")
  71. if len(arr) != 2 {
  72. continue
  73. }
  74. paths = append(paths, strings.TrimSpace(arr[1]))
  75. continue
  76. }
  77. //mereic: proc.num tags: name=crond
  78. //或者metric: proc.num tags: cmdline=cfg.json
  79. if metric.Metric == g.PROC_NUM {
  80. arr := strings.Split(metric.Tags, ",")
  81. tmpMap := make(map[int]string)
  82. for i := 0; i < len(arr); i++ {
  83. if strings.HasPrefix(arr[i], "name=") {
  84. tmpMap[1] = strings.TrimSpace(arr[i][5:])
  85. } else if strings.HasPrefix(arr[i], "cmdline=") {
  86. tmpMap[2] = strings.TrimSpace(arr[i][8:])
  87. }
  88. }
  89. procs[metric.Tags] = tmpMap
  90. }
  91. }
  92. g.SetReportUrls(urls)
  93. g.SetReportPorts(ports)
  94. g.SetReportProcs(procs)
  95. g.SetDuPaths(paths)
  96. }
  97. }
  1. func syncTrustableIps() {
  2. duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
  3. for {
  4. time.Sleep(duration)
  5. var ips string
  6. err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
  7. if err != nil {
  8. log.Println("ERROR: call Agent.TrustableIps fail", err)
  9. continue
  10. }
  11. // 设置到本地可信IP列表
  12. g.SetTrustableIps(ips)
  13. }
  14. }
  1. // 间隔internal时间执行fs中的函数
  2. type FuncsAndInterval struct {
  3. Fs []func() []*model.MetricValue
  4. Interval int
  5. }
  6. var Mappers []FuncsAndInterval
  7. // 根据调用指令类型和是否容易被挂起而分类(通过不同的goroutine去执行,避免相互之间的影响)
  8. func BuildMappers() {
  9. interval := g.Config().Transfer.Interval
  10. Mappers = []FuncsAndInterval{
  11. FuncsAndInterval{
  12. Fs: []func() []*model.MetricValue{
  13. AgentMetrics,
  14. CpuMetrics,
  15. NetMetrics,
  16. KernelMetrics,
  17. LoadAvgMetrics,
  18. MemMetrics,
  19. DiskIOMetrics,
  20. IOStatsMetrics,
  21. NetstatMetrics,
  22. ProcMetrics,
  23. UdpMetrics,
  24. },
  25. Interval: interval,
  26. },
  27. // 容易出问题
  28. FuncsAndInterval{
  29. Fs: []func() []*model.MetricValue{
  30. DeviceMetrics,
  31. },
  32. Interval: interval,
  33. },
  34. // 调用相同指令
  35. FuncsAndInterval{
  36. Fs: []func() []*model.MetricValue{
  37. PortMetrics,
  38. SocketStatSummaryMetrics,
  39. },
  40. Interval: interval,
  41. },
  42. FuncsAndInterval{
  43. Fs: []func() []*model.MetricValue{
  44. DuMetrics,
  45. },
  46. Interval: interval,
  47. },
  48. FuncsAndInterval{
  49. Fs: []func() []*model.MetricValue{
  50. UrlMetrics,
  51. },
  52. Interval: interval,
  53. },
  54. }
  55. }
  1. func Collect() {
  2. // 配置信息判断
  3. if !g.Config().Transfer.Enabled {
  4. return
  5. }
  6. if len(g.Config().Transfer.Addrs) == 0 {
  7. return
  8. }
  9. // 读取mapper中的FuncsAndInterval集,并通过不同的goroutine运行
  10. for _, v := range funcs.Mappers {
  11. go collect(int64(v.Interval), v.Fs)
  12. }
  13. }
  14. // 间隔采集信息
  15. func collect(sec int64, fns []func() []*model.MetricValue) {
  16. // 启动断续器,间隔执行
  17. t := time.NewTicker(time.Second * time.Duration(sec)).C
  18. for {
  19. <-t
  20. hostname, err := g.Hostname()
  21. if err != nil {
  22. continue
  23. }
  24. mvs := []*model.MetricValue{}
  25. // 读取忽略metric名单
  26. ignoreMetrics := g.Config().IgnoreMetrics
  27. // 从funcs的list中取出每个采集函数
  28. for _, fn := range fns {
  29. // 执行采集函数
  30. items := fn()
  31. if items == nil {
  32. continue
  33. }
  34. if len(items) == 0 {
  35. continue
  36. }
  37. // 读取采集数据,根据忽略的metric忽略部分采集数据
  38. for _, mv := range items {
  39. if b, ok := ignoreMetrics[mv.Metric]; ok && b {
  40. continue
  41. } else {
  42. mvs = append(mvs, mv)
  43. }
  44. }
  45. }
  46. // 获取上报时间
  47. now := time.Now().Unix()
  48. // 设置上报采集项的间隔、agent主机、上报时间
  49. for j := 0; j < len(mvs); j++ {
  50. mvs[j].Step = sec
  51. mvs[j].Endpoint = hostname
  52. mvs[j].Timestamp = now
  53. }
  54. // 调用transfer发送采集数据
  55. g.SendToTransfer(mvs)
  56. }
  57. }
  1. type MetricValue struct {
  2. Endpoint string // 主机名
  3. Metric string // 信息标识cpu.idle、mem.memtotal等
  4. Value interface{} // 采集结果
  5. Step int64 // 该项上报间隔
  6. Type string // GAUGE或COUNTER
  7. Tags string // 配置报警策略
  8. Timestamp int64 // 此次上报时间
  9. }
  1. func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue {
  2. mv := model.MetricValue{
  3. Metric: metric,
  4. Value: val,
  5. Type: dataType,
  6. }
  7. size := len(tags)
  8. if size > 0 {
  9. mv.Tags = strings.Join(tags, ",")
  10. }
  11. return &mv
  12. }
  13. // 原值类型
  14. func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue {
  15. return NewMetricValue(metric, val, "GAUGE", tags...)
  16. }
  17. // 计数器类型
  18. func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue {
  19. return NewMetricValue(metric, val, "COUNTER", tags...)
  20. }
  1. // 简单封装rpc.Cilent
  2. type SingleConnRpcClient struct {
  3. sync.Mutex
  4. rpcClient *rpc.Client
  5. RpcServer string
  6. Timeout time.Duration
  7. }
  8. // 关闭rpc
  9. func (this *SingleConnRpcClient) close() {
  10. if this.rpcClient != nil {
  11. this.rpcClient.Close()
  12. this.rpcClient = nil
  13. }
  14. }
  15. // 保证rpc存在,为空则重新创建, 如果server宕机, 死循环????
  16. func (this *SingleConnRpcClient) insureConn() {
  17. if this.rpcClient != nil {
  18. return
  19. }
  20. var err error
  21. var retry int = 1
  22. for {
  23. if this.rpcClient != nil {
  24. return
  25. }
  26. // 根据timeout和server地址去连接rpc的server
  27. this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
  28. if err == nil {
  29. return
  30. }
  31. log.Printf("dial %s fail: %v", this.RpcServer, err)
  32. if retry > 6 {
  33. retry = 1
  34. }
  35. time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
  36. retry++
  37. }
  38. }
  39. // rpc client调用hbs函数
  40. func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
  41. // 加锁保证一个agent只与server有一个连接,保证性能
  42. this.Lock()
  43. defer this.Unlock()
  44. // 保证rpc连接可用
  45. this.insureConn()
  46. timeout := time.Duration(50 * time.Second)
  47. done := make(chan error)
  48. go func() {
  49. err := this.rpcClient.Call(method, args, reply)
  50. done <- err
  51. }()
  52. // 超时控制
  53. select {
  54. case <-time.After(timeout):
  55. log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
  56. this.close()
  57. case err := <-done:
  58. if err != nil {
  59. this.close()
  60. return err
  61. }
  62. }
  63. return nil
  64. }
  1. // 定义transfer的rpcClient对应Map, transferClients读写锁
  2. var (
  3. TransferClientsLock *sync.RWMutex = new(sync.RWMutex)
  4. TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{}
  5. )
  6. // 发送数据到随机的transfer
  7. func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
  8. rand.Seed(time.Now().UnixNano())
  9. // 随机transferClient发送数据,直到发送成功
  10. for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
  11. addr := Config().Transfer.Addrs[i]
  12. if _, ok := TransferClients[addr]; !ok {
  13. initTransferClient(addr)
  14. }
  15. if updateMetrics(addr, metrics, resp) {
  16. break
  17. }
  18. }
  19. }
  20. // 初始化addr对应的transferClient
  21. func initTransferClient(addr string) {
  22. TransferClientsLock.Lock()
  23. defer TransferClientsLock.Unlock()
  24. TransferClients[addr] = &SingleConnRpcClient{
  25. RpcServer: addr,
  26. Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
  27. }
  28. }
  29. // 调用rpc接口发送metric
  30. func updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
  31. TransferClientsLock.RLock()
  32. defer TransferClientsLock.RUnlock()
  33. err := TransferClients[addr].Call("Transfer.Update", metrics, resp)
  34. if err != nil {
  35. log.Println("call Transfer.Update fail", addr, err)
  36. return false
  37. }
  38. return true
  39. }
  1. // 插件信息: 路径、修改时间、运行周期(来自plugin插件)
  2. type Plugin struct {
  3. FilePath string
  4. MTime int64
  5. Cycle int
  6. }
  7. // 插件map和调度器map
  8. var (
  9. Plugins = make(map[string]*Plugin)
  10. PluginsWithScheduler = make(map[string]*PluginScheduler)
  11. )
  12. // 删除不需要的plugin
  13. func DelNoUsePlugins(newPlugins map[string]*Plugin) {
  14. for currKey, currPlugin := range Plugins {
  15. newPlugin, ok := newPlugins[currKey]
  16. if !ok || currPlugin.MTime != newPlugin.MTime {
  17. deletePlugin(currKey)
  18. }
  19. }
  20. }
  21. // 添加同步时增加的plugin
  22. func AddNewPlugins(newPlugins map[string]*Plugin) {
  23. for fpath, newPlugin := range newPlugins {
  24. // 去除重复插件
  25. if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
  26. continue
  27. }
  28. // 为新添加的插件新建调度器
  29. Plugins[fpath] = newPlugin
  30. sch := NewPluginScheduler(newPlugin)
  31. PluginsWithScheduler[fpath] = sch
  32. // 启动plugin调度
  33. sch.Schedule()
  34. }
  35. }
  36. func ClearAllPlugins() {
  37. for k := range Plugins {
  38. deletePlugin(k)
  39. }
  40. }
  41. func deletePlugin(key string) {
  42. v, ok := PluginsWithScheduler[key]
  43. if ok {
  44. // 暂停调度plugin
  45. v.Stop()
  46. delete(PluginsWithScheduler, key)
  47. }
  48. delete(Plugins, key)
  49. }
  1. // 持续间隔执行plugin
  2. type PluginScheduler struct {
  3. Ticker *time.Ticker
  4. Plugin *Plugin
  5. Quit chan struct{}
  6. }
  7. // 根据plugin创建新的schedule
  8. func NewPluginScheduler(p *Plugin) *PluginScheduler {
  9. scheduler := PluginScheduler{Plugin: p}
  10. scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)
  11. scheduler.Quit = make(chan struct{})
  12. return &scheduler
  13. }
  14. // plugin调度,间隔执行PluginRun,除非收到quit消息
  15. func (this *PluginScheduler) Schedule() {
  16. go func() {
  17. for {
  18. select {
  19. case <-this.Ticker.C:
  20. PluginRun(this.Plugin)
  21. case <-this.Quit:
  22. this.Ticker.Stop()
  23. return
  24. }
  25. }
  26. }()
  27. }
  28. // 停止plugin调度
  29. func (this *PluginScheduler) Stop() {
  30. close(this.Quit)
  31. }
  32. // 执行插件,读取插件运行返回数据并上报transfer
  33. func PluginRun(plugin *Plugin) {
  34. timeout := plugin.Cycle*1000 - 500
  35. fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)
  36. if !file.IsExist(fpath) {
  37. log.Println("no such plugin:", fpath)
  38. return
  39. }
  40. debug := g.Config().Debug
  41. if debug {
  42. log.Println(fpath, "running...")
  43. }
  44. cmd := exec.Command(fpath)
  45. var stdout bytes.Buffer
  46. cmd.Stdout = &stdout
  47. var stderr bytes.Buffer
  48. cmd.Stderr = &stderr
  49. cmd.Start()
  50. err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)
  51. errStr := stderr.String()
  52. if errStr != "" {
  53. logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
  54. if _, err = file.WriteString(logFile, errStr); err != nil {
  55. log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
  56. }
  57. }
  58. if isTimeout {
  59. // has be killed
  60. if err == nil && debug {
  61. log.Println("[INFO] timeout and kill process", fpath, "successfully")
  62. }
  63. if err != nil {
  64. log.Println("[ERROR] kill process", fpath, "occur error:", err)
  65. }
  66. return
  67. }
  68. if err != nil {
  69. log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
  70. return
  71. }
  72. // exec successfully
  73. data := stdout.Bytes()
  74. if len(data) == 0 {
  75. if debug {
  76. log.Println("[DEBUG] stdout of", fpath, "is blank")
  77. }
  78. return
  79. }
  80. var metrics []*model.MetricValue
  81. err = json.Unmarshal(data, &metrics)
  82. if err != nil {
  83. log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
  84. return
  85. }
  86. g.SendToTransfer(metrics)
  87. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注