[关闭]
@lijianying10 2017-07-14T08:36:30.000000Z 字数 2432 阅读 938

向esm-gateway输入数据流程说明文档

1、背景

由于一些业务需要输出数据进入Influxdb直接使用ESM告警,因此需要一种手段直接绕过ESM-agent直接输入到Influxdb数据库。基于此,esm仅仅提供writers配置,用户获取到writers的配置后,自行向esm-gateway写入数据。

2、设计

2.1 流程

workflow

2.2 说明

  1. 使用Unix Socket技术,用户通过unix socket请求esm-agent请求writers的配置
  2. esm-agent返回writers配置给用户
  3. 用户通过writers配置esm-gateway写数据并注册schema

3、流程

3.1 Step1: 从esm获取writers配置

通过 UnixSock 获取 Writers 配置

3.1.1 curl

  1. curl --unix-socket /var/run/esm-agent/esm-agent.sock http:/writer

3.1.2 go

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "os"
  9. )
  10. func main() {
  11. client := http.Client{
  12. Transport: &http.Transport{
  13. DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
  14. return net.Dial("unix", "/var/run/esm-agent/esm-agent.sock")
  15. },
  16. },
  17. }
  18. resp, err := client.Get("http://localhost/writer")
  19. if err != nil {
  20. fmt.Println("error: " + err.Error())
  21. os.Exit(1)
  22. }
  23. io.Copy(os.Stdout, resp.Body)
  24. }

3.1.3 python

依赖类库requests_unixsocket

推荐安装方式:

  1. pip install requests_unixsocket

以下为执行代码:

  1. import requests_unixsocket
  2. if __name__=="__main__":
  3. url = "http+unix://%2Fvar%2Frun%2Fesm-agent%2Fesm-agent.sock"
  4. res = requests_unixsocket.get(url+"/writer")
  5. print res.text

3.1.4响应格式:

  1. [
  2. {
  3. "database": "esm",
  4. "addr": "esm.gateway.wg.elenet.me:9100",
  5. "payload_size": 1024,
  6. "precision": "s"
  7. }
  8. ]

3.2 Step2: 输出到esm-gateway

3.2.1 go 输出到influxdb

官方demo:

  1. import (
  2. "time"
  3. "log"
  4. client "github.com/influxdata/influxdb/client/v2"
  5. )
  6. func writePoints(clnt client.Client) {
  7. sampleSize := 1000
  8. bp, err := client.NewBatchPoints(client.BatchPointsConfig{
  9. Database: "systemstats",
  10. Precision: "us",
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. rand.Seed(time.Now().UnixNano())
  16. for i := 0; i < sampleSize; i++ {
  17. regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
  18. tags := map[string]string{
  19. "cpu": "cpu-total",
  20. "host": fmt.Sprintf("host%d", rand.Intn(1000)),
  21. "region": regions[rand.Intn(len(regions))],
  22. }
  23. idle := rand.Float64() * 100.0
  24. fields := map[string]interface{}{
  25. "idle": idle,
  26. "busy": 100.0 - idle,
  27. }
  28. pt, err := client.NewPoint(
  29. "cpu_usage",
  30. tags,
  31. fields,
  32. time.Now(),
  33. )
  34. if err != nil {
  35. log.Fatal(err)
  36. }
  37. bp.AddPoint(pt)
  38. }
  39. if err := clnt.Write(bp); err != nil {
  40. log.Fatal(err)
  41. }
  42. }

3.2.2 python 输出到influxdb

依赖类库influxdb

推荐安装方式:

  1. pip install influxdb

以下为插入的执行代码:

  1. from influxdb import InfluxDBClient
  2. json_body = [
  3. {
  4. "measurement": "cpu_load_short",
  5. "tags": {
  6. "host": "server01",
  7. "region": "us-west"
  8. },
  9. "time": "2009-11-10T23:00:00Z",
  10. "fields": {
  11. "value": 0.64
  12. }
  13. }
  14. ]
  15. client = InfluxDBClient('localhost', 8086, 'root', 'root', 'example')
  16. client.write_points(json_body)

3.3 Step3: 注册schema

输入的Matrix格式注意点

3.3.1 命名方式

全小写,用_分割单词

3.3.2 matrix name

name必须要明确!

3.3.3 tag

tag 必须包含 hosthostgroup 两个,其余可以有自定义tag,但是必须明确类型数量,且不能过多!

host为主机名

hostgroup为一个或多个appid,用,隔开!

ps: 注册schema时若有问题请联系 ping.liush !

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注