[关闭]
@rickyChen 2017-12-27T08:06:37.000000Z 字数 3602 阅读 4941

Hangout with ClickHouse

Hangout ClickHouse


当我们部门发现了ClickHouse这么一个优秀数据存储仓库后,经过了一段时间的摸索测试,就把线上大部分与数据分析相关的业务迁移到了ClickHouse上。这篇文章将会介绍我们如何通过Kafka接入Nginx日志到ClickHouse中。当然,其他的应用日志也可以参照以下逻辑将数据接入ClickHouse。

我们最初使用Python脚本清洗日志写入ClickHouse,但是这样开发和维护都有一定的成本。后来我们使用Hangout作为我们的数据清洗工具,Hangout是一个通用的日志分析工具,功能类同Logstash,可以把不同种类的日志处理后写入其他的地方,比如Kafka、Elasticsearch、ClickHouse。

Prerequisites

我们假设Nginx日志已经推送到了Kafka。

Hangout已经提供了大量的插件支持我们的日志处理,下面是为了完成一个完整的配置需要另外下载的插件:

下面是我们安装Hangout以及Hangout-output-clickhouse插件的具体步骤:

  1. mkdir hangout
  2. cd hangout
  3. wget https://github.com/childe/hangout/releases/download/0.3.0/hangout-dist-0.3.0-release-bin.zip
  4. unzip hangout-dist-0.3.0-release-bin.zip
  5. cd modules
  6. wget https://github.com/RickyHuo/hangout-output-clickhouse/releases/download/0.0.2/hangout-output-plugins-clickhouse-0.0.2-jar-with-dependencies.jar

Configuration Example: Nginx Logs

Log Sample

001.cms.msina..sinanode.com`[27/Dec/2017:16:01:03 +0800]`-`"GET /n/front/w636h3606893220.jpg/w720q75apl.webp HTTP/1.1"`"SinaNews/201706071542.1 CFNetwork/758.1.6 Darwin/15.0.0"`200`[127.0.0.1]`-`"-"`0.021`10640`-`127.0.0.1`l.sinaimg.cn`-

Hangout配置包括三个部分:inputs、filters和outputs

Input

如下所示,是一个从Kafka读取数据流的配置

  1. inputs:
  2. - Kafka:
  3. codec: plain
  4. encoding: UTF8 # defaut UTF8
  5. topic:
  6. comos-proxy: 10
  7. consumer_settings:
  8. group.id: hangout_bip_cms
  9. zookeeper.connect: localhost:2181
  10. auto.commit.interval.ms: "60000"
  11. socket.receive.buffer.bytes: "1048576"
  12. fetch.message.max.bytes: "1048576"

Filters

在Filters部分,这里有一系列转化的步骤,包括正则解析、时间转换、类型转换等

  1. filters:
  2. - Grok:
  3. match:
  4. - '%{NOTSPACE:_hostname}`\[%{HTTPDATE:timestamp}\]`%{NOTSPACE:upstream}`"%{NOTSPACE:_method}\s%{NOTSPACE:_uri}\s%{NOTSPACE:httpversion}"`%{QS:_ua}`%{NUMBER:_http_code}`\[%{IP:_remote_addr}\]`%{NOTSPACE:unknow1}`%{QS:_reference}`%{NUMBER:_request_time}`%{NUMBER:_data_size}`%{NOTSPACE:unknow3}`%{IP:_http_x_forwarded_for}`%{NOTSPACE:_domain}`%{DATA:unknow4}$'
  5. remove_fields: ['message']
  6. - Date:
  7. src: timestamp
  8. formats:
  9. - 'dd/MMM/yyyy:HH:mm:ss Z'
  10. remove_fields: ['timestamp']
  11. target: utc_date
  12. - Convert:
  13. fields:
  14. _request_time:
  15. to: float
  16. - Add:
  17. fields:
  18. date: "${(utc_date)?substring(0, 10)}"
  19. datetime: "${(utc_date)?substring(0, 10) + ' ' + (utc_date)?substring(11, 19)}"
  20. hour: "${(utc_date)?substring(11, 13)}"
  21. - Convert:
  22. fields:
  23. hour:
  24. to: integer
  25. minute:
  26. to: integer
  27. _data_size:
  28. to: integer

Outputs

最后我们将处理好的结构化数据写入ClickHouse

  1. outputs:
  2. - com.sina.bip.hangout.outputs.Clickhouse:
  3. host: localhost:8123
  4. database: cms
  5. table: cms_msg_all
  6. fields: ['date', 'datetime','hour', '_hostname', '_domain', '_data_size', '_uri', '_request_time', '_ua', '_http_code', '_remote_addr', '_method', '_reference', '_url']
  7. replace_include_fields: ['_uri', '_url']
  8. bulk_size: 300

ClickHouse Schema

当然, ClickHouse存储这些数据的前提是我们已经建立好了这些数据表。具体建表操作如下:

  1. CREATE TABLE cms.cms_msg
  2. (
  3. date Date,
  4. datetime DateTime,
  5. hour Int8,
  6. _uri String,
  7. _url String,
  8. _request_time Float32,
  9. _http_code String,
  10. _hostname String,
  11. _domain String,
  12. _http_x_forwarded_for String,
  13. _remote_addr String,
  14. _reference String,
  15. _data_size Int32,
  16. _method String,
  17. _rs String,
  18. _rs_time Float32,
  19. _ua String
  20. ) ENGINE = MergeTree(date, (hour, date), 8192)
  21. CREATE TABLE cms.cms_msg_all
  22. (
  23. date Date,
  24. datetime DateTime,
  25. hour Int8,
  26. _uri String,
  27. _url String,
  28. _request_time Float32,
  29. _http_code String,
  30. _hostname String,
  31. _domain String,
  32. _http_x_forwarded_for String,
  33. _remote_addr String,
  34. _reference String,
  35. _data_size Int32,
  36. _method String,
  37. _ua String
  38. ) ENGINE = Distributed(bip_ck_cluster, 'cms', 'cms_msg', rand())

Conclusion

在这篇文章中,我们介绍了如何使用Hangout将Nginx日志文件写入ClickHouse中。Hangout从Kafka中读取原始日志,将其转换成为结构化的数据,因此能被我们的Hangout-output-clickhouse插件读取写入ClickHouse中。整个流程还有很多可以自定义和提升的地方,Hangout使用请参照Hangout README,Hangout-output-clickhouse的更多功能请参照README。此外,我们在ClickHouse数据的基础上使用了SuperSet和Grafana作为我们的数据展示和监控工具。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注