[关闭]
@zhangsiming65965 2019-03-18T02:43:02.000000Z 字数 21387 阅读 356

ELK日志分析平台(中)

消息队列与数据库缓存

---Author:张思明 ZhangSiming

---Mail:1151004164@cnu.edu.cn

---QQ:1030728296

如果有梦想,就放开的去追;
因为只有奋斗,才能改变命运;


一、Elasticsearch深入讲解

1.1对比MySQL数据库理解Elasticsearch数据库

Elasticsearch 含义 MySQL
Index 索引是多个文档的集合(必须是小写字母) Database
Type 一个Index可以定义一种或多种类型,将Document逻辑分组 Table
Document Index里每条记录称为Document,若干文档构建一个Index Row
Field 字段,Elasticsearch存储的最小单元 Column

1.2部署Elasticsearch-Cluster

IP 作用
192.168.17.139 Elasticsearch-Node1(用部署好的)
192.168.17.140 Elasticsearch-Node2(新部署)
192.168.17.141 Elasticsearch-Node3(新部署)
  1. #部署两台Elasticsearch节点(与之前部署方式相同)
  2. #修改Elasticsearch-Node1的配置文件
  3. [root@ZhangSiming elasticsearch]# pwd
  4. /usr/local/elasticsearch
  5. [root@ZhangSiming elasticsearch]# vim config/elasticsearch.yml
  6. [root@ZhangSiming elasticsearch]# sed -n '17p;23p;33p;37p;56p;69p;73p' config/elasticsearch.yml
  7. cluster.name: es-cluster
  8. node.name: node-1
  9. path.data: /usr/local/elasticsearch/data
  10. path.logs: /usr/local/elasticsearch/logs
  11. network.host: 192.168.17.139
  12. discovery.zen.ping.unicast.hosts: ["192.168.17.139", "192.168.17.140","192.168.17.141"]
  13. discovery.zen.minimum_master_nodes: 1
  14. #这个数值要和ELasticsearch-Node集群中的node.master:true总和一致,否则集群会组件失败
  15. #Elasticsearch集群配置
  16. [root@ZhangSiming elasticsearch]# scp config/elasticsearch.yml root@192.168.17.140:/usr/local/elasticsearch/config/elasticsearch.yml
  17. root@192.168.17.140's password:
  18. elasticsearch.yml 100% 2947 2.7MB/s 00:00
  19. [root@ZhangSiming elasticsearch]# scp config/elasticsearch.yml root@192.168.17.141:/usr/local/elasticsearch/config/elasticsearch.yml
  20. root@192.168.17.141's password:
  21. elasticsearch.yml 100% 2947 2.5MB/s 00:00
  22. #scp到另外两个节点之后更换配置文件里的network.host为自身和node.name为2,3即可
  23. #启动三台Elasticsearch
  24. [root@ZhangSiming elasticsearch]# su -s /bin/bash elk bin/start.sh
  25. [root@ZhangSiming elasticsearch]# curl -X GET "192.168.17.139:9200/_cat/health?v"
  26. epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
  27. 1552197192 13:53:12 es-cluster green 3 3 0 0 0 0 0 0 - 100.0%
  28. #集群健康检查成功
  1. [root@ZhangSiming elasticsearch]# cat logs/es-cluster.log
  2. #查看集群日志
  3. [2019-03-10T13:41:04,380][INFO ][o.e.d.z.ZenDiscovery ] [node-1] failed to send join request to master [{node-2}{L_zSuq3kS22qVUEqdiYKBQ}{nO795s-tTnmFL_N8fQnAGQ}{192.168.17.140}{192.168.17.140:9300}], reason [RemoteTransportException[[node-2][192.168.17.140:9300][internal:discovery/zen/join]]; nested: NotMasterException[Node [{node-2}{L_zSuq3kS22qVUEqdiYKBQ}{nO795s-tTnmFL_N8fQnAGQ}{192.168.17.140}{192.168.17.140:9300}] not master for join request]; ], tried [3] times

这是因为Node-2的数据目录data包括Node-1的部分,一定是偷懒没有重新配置一台Elasticsearch,直接scp的吧!解决办法是情况连接不上的Node的data数据目录重新启动即可。

1.3Elasticsearch数据库的操作

命令格式:

  1. curl -X<verb> '<protocol>://<host>:<port>/<path>?<query_string>' -d '<body>'
参数 描述
verb HTTP方法,比如GET,POST,PUT,HEAD,DELETE
host ES集群中的任意节点主机名
port ES HTTP服务端口,默认9200
path 索引路径
query_string 可选的查询请求参数。例如?pretty参数将格式化输出JSON数据
-d 里面放一个GET的JSON格式请求主体
body 自己写的JSON格式的请求主体
  1. #列出数据库所有索引
  2. [root@ZhangSiming elasticsearch]# curl -X GET "192.168.17.139:9200/_cat/indices?v"
  3. health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
  4. #创建一个索引
  5. [root@ZhangSiming elasticsearch]# curl -X PUT "192.168.17.139:9200/zhangsiming"
  6. {"acknowledged":true,"shards_acknowledged":true,"index":"zhangsiming"}
  7. #查看数据库索引索引
  8. [root@ZhangSiming elasticsearch]# curl -X GET "192.168.17.139:9200/_cat/indices?v"
  9. health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
  10. green open zhangsiming htsNjF_qRJ2bSIZrsRLqeA 5 1 0 0 2.2kb 1.1kb

Elasticsearch命令操作官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/current/_index_and_query_a_document.html

1.4Head插件图形管理Elasticsearch

  1. #Head插件下载
  2. [root@ZhangSiming elasticsearch]# wget https://npm.taobao.org/mirrors/node/latest-v4.x/node-v4.4.7-linux-x64.tar.gz
  3. --2019-03-10 14:18:29-- https://npm.taobao.org/mirrors/node/latest-v4.x/node-v4.4.7-linux-x64.tar.gz
  4. Resolving npm.taobao.org (npm.taobao.org)... 114.55.80.225
  5. Connecting to npm.taobao.org (npm.taobao.org)|114.55.80.225|:443... connected.
  6. HTTP request sent, awaiting response... 302 Found
  7. Location: http://cdn.npm.taobao.org/dist/node/latest-v4.x/node-v4.4.7-linux-x64.tar.gz [following]
  8. --2019-03-10 14:18:29-- http://cdn.npm.taobao.org/dist/node/latest-v4.x/node-v4.4.7-linux-x64.tar.gz
  9. Resolving cdn.npm.taobao.org (cdn.npm.taobao.org)... 61.240.131.233, 61.240.131.230, 61.240.131.227, ...
  10. Connecting to cdn.npm.taobao.org (cdn.npm.taobao.org)|61.240.131.233|:80... connected.
  11. HTTP request sent, awaiting response... 200 OK
  12. Length: 12189839 (12M) [application/octet-stream]
  13. Saving to: node-v4.4.7-linux-x64.tar.gz
  14. 100%[======================================>] 12,189,839 1.26MB/s in 13s
  15. 2019-03-10 14:18:42 (926 KB/s) - node-v4.4.7-linux-x64.tar.gz saved [12189839/12189839]
  16. [root@ZhangSiming elasticsearch]# tar xf node-v4.4.7-linux-x64.tar.gz -C /usr/local/
  17. [root@ZhangSiming elasticsearch]# mv /usr/local/node-v4.4.7-linux-x64/ /usr/local/node-v4.4
  18. [root@ZhangSiming elasticsearch]# echo -e 'NODE_HOME=/usr/local/node-v4.4\nPATH=$NODE_HOME/bin:$PATH\nexport NODE_HOME PATH' >> /etc/profile
  19. [root@ZhangSiming elasticsearch]# tail -3 /etc/profile
  20. NODE_HOME=/usr/local/node-v4.4
  21. PATH=$NODE_HOME/bin:$PATH
  22. export NODE_HOME PATH
  23. [root@ZhangSiming elasticsearch]# . /etc/profile
  24. [root@ZhangSiming elasticsearch]# yum install -y git &>/dev/null
  25. #拉取Head代码
  26. [root@ZhangSiming elasticsearch]# git clone git://github.com/mobz/elasticsearch-head.git
  27. Cloning into 'elasticsearch-head'...
  28. remote: Enumerating objects: 32, done.
  29. remote: Counting objects: 100% (32/32), done.
  30. remote: Compressing objects: 100% (26/26), done.
  31. remote: Total 4260 (delta 8), reused 21 (delta 6), pack-reused 4228
  32. Receiving objects: 100% (4260/4260), 2.21 MiB | 44.00 KiB/s, done.
  33. Resolving deltas: 100% (2337/2337), done.
  34. [root@ZhangSiming elasticsearch]# cd elasticsearch-head/
  35. [root@ZhangSiming elasticsearch-head]# npm install
  36. #安装插件
  37. [root@ZhangSiming elasticsearch-head]# ls
  38. Dockerfile npm-debug.log
  39. Dockerfile-alpine package.json
  40. elasticsearch-head.sublime-project plugin-descriptor.properties
  41. Gruntfile.js proxy
  42. grunt_fileSets.js README.textile
  43. index.html _site
  44. LICENCE src
  45. node_modules test
  46. [root@ZhangSiming elasticsearch-head]# vim Gruntfile.js
  47. #修改源码包里的配置文件
  48. [root@ZhangSiming elasticsearch-head]# sed -n '90,97p' Gruntfile.js
  49. connect: {
  50. server: {
  51. options: {
  52. port: 9100,
  53. base: '.',
  54. keepalive: true,
  55. hostname: '*'
  56. }
  57. #授权插件连接Elasticsearch的APi,这是因为Elasticsearch5.0+版本以后,要想连接API必须先要进行授权才行。
  58. [root@ZhangSiming elasticsearch-head]# echo -e 'http.cors.enabled: true\nhttp.cors.allow-origin: "*"' >> /usr/local/elasticsearch/config/elasticsearch.yml
  59. [root@ZhangSiming elasticsearch-head]# ps -elf | grep elasticsearch
  60. 0 S elk 2213 1 0 80 0 - 568185 futex_ 13:52 pts/2 00:00:19 /usr/local/jdk//bin/java -Xms100M -Xmx100M -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/tmp/elasticsearch.wymbh4OP -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/usr/local/elasticsearch -Des.path.conf=/usr/local/elasticsearch/config -cp /usr/local/elasticsearch/lib/* org.elasticsearch.bootstrap.Elasticsearch -d
  61. 0 S root 2377 1709 0 80 0 - 28176 pipe_w 14:26 pts/2 00:00:00 grep --color=auto elasticsearch
  62. [root@ZhangSiming elasticsearch-head]# kill -9 2213
  63. [root@ZhangSiming elasticsearch-head]# su -s /bin/bash elk /usr/local/elasticsearch/bin/start.sh
  64. #启动Head图形插件
  65. [root@ZhangSiming elasticsearch-head]# npm run start
  66. > elasticsearch-head@0.0.0 start /usr/local/elasticsearch/elasticsearch-head
  67. > grunt server
  68. >> Local Npm module "grunt-contrib-jasmine" not found. Is it installed?
  69. Running "connect:server" (connect) task
  70. Waiting forever...
  71. Started connect web server on http://localhost:9100

image_1d5j56bo21dg0ggc1mpp88n1fhh9.png-66.9kB

image_1d5j59ar01tsgfsi106nvictk737.png-70.3kB

二、Logstash深入讲解(二)

2.1Logstash条件判断符号

符号 含义
== 相等
!= 不等
> 大于
< 小于
<= 小于等于
>= 大于等于
=~ 正则匹配
!~ 不匹配正则
in 包含
not in 不包含
符号 含义
and
or
nand 与非
xor 或非
符号 含义
! 取反
() 复合表达式
!() 对复合表达式取反

2.2Logstash-Input模块下的Stdin,File,Tcp,Beats插件

2.2.1stdin示例

从标准输入获取数据。

  1. [root@ZhangSiming logstash]# cat config/logstash.conf
  2. input {
  3. stdin{
  4. }
  5. }
  6. filter {
  7. }
  8. output {
  9. stdout {
  10. codec => rubydebug
  11. }
  12. }
  1. [root@ZhangSiming logstash]# bin/logstash -f config/logstash.conf -t
  2. Sending Logstash\'s logs to /usr/local/logstash/logs which is now configured via log4j2.properties
  3. [2019-03-10T15:57:06,540][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/local/logstash/modules/fb_apache/configuration"}
  4. [2019-03-10T15:57:06,607][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/local/logstash/modules/netflow/configuration"}
  5. [2019-03-10T15:57:08,036][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
  6. Configuration OK
  7. #配置文件检测成功
  8. [2019-03-10T15:57:13,493][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
  9. #前台启动Logstash进行测试
  10. [root@ZhangSiming logstash]# bin/logstash -f config/logstash.conf
  11. 192.168.17.1 - - [10/Mar/2019:12:39:13 +0800] \"GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36"
  12. {
  13. "@version" => "1",
  14. "@timestamp" => 2019-03-10T07:59:36.345Z,
  15. "message" => "[root@ZhangSiming logstash]# bin/logstash -f config/logstash.conf 192.168.17.1 - - [10/Mar/2019:12:39:13 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36\"",
  16. "host" => "ZhangSiming\"
  17. }
  18. #前台标准输入输入什么,filter不设置就是不过滤,测试输出就输出什么

2.2.2File示例

读取文件的内容作为传入数据。

  1. [root@ZhangSiming logstash]# cat config/logstash.conf
  2. input {
  3. file {
  4. path => "/usr/local/nginx/logs/access.log"
  5. tags => "123"
  6. type => "syslog"
  7. #path是读取的文件路径,tags是标签,type是类型
  8. }
  9. }
  10. filter {
  11. }
  12. output {
  13. stdout {
  14. codec => rubydebug
  15. }
  16. }
  1. #进行一次的Nginx访问
  2. [root@ZhangSiming ~]# > /usr/local/nginx/logs/access.log
  3. [root@ZhangSiming ~]# curl -I localhost
  4. HTTP/1.1 200 OK
  5. Server: nginx/1.10.2
  6. Date: Sun, 10 Mar 2019 08:06:01 GMT
  7. Content-Type: text/html
  8. Content-Length: 612
  9. Last-Modified: Thu, 07 Mar 2019 11:02:45 GMT
  10. Connection: keep-alive
  11. ETag: "5c80fa55-264"
  12. Accept-Ranges: bytes
  13. [root@ZhangSiming ~]# cat /usr/local/nginx/logs/access.log
  14. 127.0.0.1 - - [10/Mar/2019:16:06:01 +0800] "HEAD / HTTP/1.1" 200 0 "-" "curl/7.29.0"
  15. #Logstash过滤结果
  16. {
  17. \"host" => "ZhangSiming",
  18. "@timestamp" => 2019-03-10T08:06:02.555Z,
  19. "message" => "127.0.0.1 - - [10/Mar/2019:16:06:01 +0800] \"HEAD / HTTP/1.1\" 200 0 \"-\" \"curl/7.29.0\"",
  20. "tags" => [
  21. [0] "123"
  22. ],
  23. "path" => "/usr/local/nginx/logs/access.log",
  24. "type" => "syslog",
  25. "@version" => "1"
  26. }

2.2.3TCP示例

通过监听TCP端口接收日志。

  1. [root@ZhangSiming logstash]# cat config/logstash.conf
  2. input {
  3. tcp {
  4. port => 12345
  5. type => "nc"
  6. }
  7. }
  8. filter {
  9. }
  10. output {
  11. stdout {
  12. codec => rubydebug
  13. }
  14. }
  1. #安装网络调试工具netcat(nc)
  2. [root@ZhangSiming ~]# which nc
  3. /usr/bin/nc
  4. [root@ZhangSiming ~]# echo "zhangsiming" | nc 192.168.17.145 12345
  5. #nc开启一个终端ip为192.168.17.145的12345端口,传入字符"zhangsiming"
  6. #Logstash过滤结果
  7. {
  8. "type" => "nc",
  9. "@timestamp" => 2019-03-10T08:12:47.172Z,
  10. "message" => "zhangsiming",
  11. "@version" => "1",
  12. "port" => 39808,
  13. "host" => "ZhangSiming"
  14. }

2.2.4Beats示例

此处稍后完善!

更多Input模块下的插件请看官网链接:

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

2.3Logstash-Input(OutPut)-Codec插件

该插件主要实现输入输出的转码功能。
其中一个功能是,codec => json {xxxx}将json格式数据进行编码转换,更加易于观看。
注意,codec只是个转码插件,需要在输出插件里面引用,否则你都什么都没输入,转码谁?

  1. [root@ZhangSiming logstash]# cat config/logstash.conf
  2. input {
  3. stdin {
  4. codec => json {
  5. charset => ["UTF-8"]
  6. #json格式转为UTF-8
  7. }
  8. }
  9. }
  10. filter {
  11. }
  12. output {
  13. stdout {
  14. codec => rubydebug
  15. }
  16. }
  1. #生成一段json数据
  2. [root@ZhangSiming ~]# python
  3. Python 2.7.5 (default, Apr 11 2018, 07:36:10)
  4. [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
  5. Type "help", "copyright", "credits" or "license" for more information.
  6. >>> import json
  7. >>> data = [{'a':1,'b':2,'c':3,'d':4,'e':5}]
  8. >>> json = json.dumps(data)
  9. >>> print json
  10. [{"a": 1, "c": 3, "b": 2, "e": 5, "d": 4}]
  11. #Logstash过滤输出
  12. [{"a": 1, "c": 3, "b": 2, "e": 5, "d": 4}]
  13. {
  14. "@version" => "1",
  15. "host" => "ZhangSiming",
  16. "d" => 4,
  17. "@timestamp" => 2019-03-10T08:24:07.253Z,
  18. "c" => 3,
  19. "b" => 2,
  20. "e" => 5,
  21. "a" => 1
  22. }

2.4Logstash-Filter-json、kv插件

json插件:设置相应条件,将json结构化解析;
kv插件:将输入的数据按照指定字符进行切割。

  1. #json插件示例
  2. [root@ZhangSiming logstash]# cat config/logstash.conf
  3. input {
  4. stdin {
  5. }
  6. }
  7. filter {
  8. json {
  9. source => "message"
  10. target => "content"
  11. #把message字段的json格式解析到content字段中
  12. }
  13. }
  14. output {
  15. stdout {
  16. codec => rubydebug
  17. }
  18. }
  19. #kv插件示例
  20. [root@ZhangSiming logstash]# cat config/logstash.conf
  21. input {
  22. stdin {
  23. }
  24. }
  25. filter {
  26. kv {
  27. field_split => "&?"
  28. }
  29. }
  30. output {
  31. stdout {
  32. codec => rubydebug
  33. }
  34. }
  1. #python生成json代码
  2. [root@ZhangSiming ~]# python
  3. Python 2.7.5 (default, Apr 11 2018, 07:36:10)
  4. [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
  5. Type "help", "copyright", "credits" or "license" for more information.
  6. >>> import json
  7. >>> data = [{'a':1,'b':2,'c':3,'d':4,'e':5}]
  8. >>> json = json.dumps(data)
  9. >>> print json
  10. [{"a": 1, "c": 3, "b": 2, "e": 5, "d": 4}]
  11. #Logstash过滤输出
  12. [{"a": 1, "c": 3, "b": 2, "e": 5, "d": 4}]
  13. {
  14. "@version" => "1",
  15. "@timestamp" => 2019-03-10T08:52:11.661Z,
  16. "host" => "ZhangSiming",
  17. "message" => "[{\"a\": 1, \"c\": 3, \"b\": 2, \"e\": 5, \"d\": 4}]",
  18. "content" => [
  19. [0] {
  20. "a" => 1,
  21. "c" => 3,
  22. "b" => 2,
  23. "e" => 5,
  24. "d" => 4
  25. }
  26. ]
  27. }
  1. name=zhangsiming class=169 skill=k8s
  2. {
  3. "@timestamp" => 2019-03-10T08:57:21.945Z,
  4. "@version" => "1",
  5. "message" => "name=zhangsiming class=169 skill=k8s",
  6. #默认都会输出到message字段中
  7. "host" => "ZhangSiming",
  8. "name" => "zhangsiming class=169 skill=k8s"
  9. }
  10. name=zhangsiming&class=169&skill=k8s
  11. {
  12. "class" => "169",
  13. "skill" => "k8s",
  14. "@timestamp" => 2019-03-10T08:57:36.279Z,
  15. "@version" => "1",
  16. "name" => "zhangsiming",
  17. "host" => "ZhangSiming",
  18. "message" => "name=zhangsiming&class=169&skill=k8s"
  19. }

2.5Logstash-Filter-grok插件

2.5.1 grok自定义正则的数据抓取模式

前面用到过,这个插件比较强大,可以用来自定义正则抓取数据,形成新的字段。

  1. #自定义字段正则抓取Nginx-access.log
  2. [root@ZhangSiming logstash]# cat /usr/local/nginx/logs/access.log
  3. 127.0.0.1 - - [10/Mar/2019:16:06:01 +0800] "HEAD / HTTP/1.1" 200 0 "-" "curl/7.29.0"
  4. #编写filter
  5. [root@ZhangSiming logstash]# cat config/logstash.conf
  6. input {
  7. stdin {
  8. }
  9. }
  10. filter {
  11. grok {
  12. match => {
  13. "message" => '(?<ip>[0-9.]+) .*\[(?<time>[0-9a-zA-Z/:]+) .*HTTP/1.1" (?<mark>[0-9]+) (?<size>[0-9]+) .*'
  14. #把日志完整的用正则表示,需要自定义的字段用(?<字段名>正则匹配)表示占位出来即可
  15. #注意这里.*\[(?<time>[0-9a-zA-Z/:]+)的 "["前有个转义符号,这个是必须有的,否则会启动不起来Logstash,因为怕和后面的正则冲突了
  16. }
  17. }
  18. }
  19. output {
  20. stdout {
  21. codec => rubydebug
  22. }
  23. }
  1. 127.0.0.1 - - [10/Mar/2019:16:06:01 +0800] "HEAD / HTTP/1.1" 200 0 "-" "curl/7.29.0"
  2. {
  3. "message" => "127.0.0.1 - - [10/Mar/2019:16:06:01 +0800] \"HEAD / HTTP/1.1\" 200 0 \"-\" \"curl/7.29.0\"",
  4. "host" => "ZhangSiming",
  5. "time" => "10/Mar/2019:16:06:01",
  6. "@timestamp" => 2019-03-10T09:13:51.559Z,
  7. "@version" => "1",
  8. "ip" => "127.0.0.1",
  9. "size" => "0",
  10. "mark" => "200"
  11. }

2.5.2 grok内置正则的数据抓取模式

为了方便用户抓取数据方便,官方自定义了一些内置正则的默认抓取方式。
Grok默认的内置正则模式,官方网页示例: https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

  1. [root@ZhangSiming logstash]# vim config/logstash.conf
  2. [root@ZhangSiming logstash]# cat config/logstash.conf
  3. input {
  4. stdin {
  5. }
  6. }
  7. filter {
  8. grok {
  9. match => {
  10. "message" => "%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\" %{NUMBER:num} %{NUMBER:size}.*"
  11. #引用官方内置的正则模式的好处是,自定义字段的正则不需要我们写了,直接引用官方写好的就行了
  12. #同理,恰当的地方注意运用转义符号
  13. }
  14. }
  15. }
  16. output {
  17. stdout {
  18. codec => rubydebug
  19. }
  20. }
  1. #Logstash过滤输出
  2. 127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] "GET /index.html HTTP/1.1" 200 612 "-" "curl/7.29.0"
  3. {
  4. "client" => "127.0.0.1",
  5. "message" => "127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] \"GET /index.html HTTP/1.1\" 200 612 \"-\" \"curl/7.29.0\"",
  6. "host" => "ZhangSiming",
  7. "num" => "200",
  8. "request" => "/index.html",
  9. "size" => "612",
  10. "method" => "GET",
  11. "@timestamp" => 2019-03-10T09:45:16.108Z,
  12. "@version" => "1"
  13. }

2.5.3 grok自定义内置字段,引入文档的数据抓取模式

也可以将自定义正则的字段写入一个文件,在grok中patterns_dir引入这个文件。

  1. [root@ZhangSiming logstash]# vim patterns
  2. [root@ZhangSiming logstash]# cat patterns
  3. STRING .*
  4. #引用标签+正则
  5. [root@ZhangSiming logstash]# vim config/logstash.conf
  6. [root@ZhangSiming logstash]# cat config/logstash.conf
  7. input {
  8. stdin {
  9. }
  10. }
  11. filter {
  12. grok {
  13. patterns_dir => "/usr/local/logstash/patterns"
  14. match => {
  15. "message" => "%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\" %{NUMBER:num} %{NUMBER:size}%{STRING:nouse}"
  16. }
  17. }
  18. }
  19. output {
  20. stdout {
  21. codec => rubydebug
  22. }
  23. }
  1. 127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] "GET /index.html HTTP/1.1" 200 612 "-" "curl/7.29.0"
  2. {
  3. "host" => "ZhangSiming",
  4. "request" => "/index.html",
  5. "@version" => "1",
  6. "size" => "612",
  7. "message" => "127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] \"GET /index.html HTTP/1.1\" 200 612 \"-\" \"curl/7.29.0\"",
  8. "@timestamp" => 2019-03-10T09:55:31.483Z,
  9. "client" => "127.0.0.1",
  10. "nouse" => " \"-\" \"curl/7.29.0\"",
  11. #成功过滤出来了
  12. "num" => "200",
  13. "method" => "GET"
  14. }

2.5.4 grok多模式匹配的数据抓取

有的时候,我们可能需要抓取多种日志格式的数据。因此,我们需要配置grok的多模式匹配的数据抓取。

  1. [root@ZhangSiming logstash]# vim patterns
  2. [root@ZhangSiming logstash]# cat patterns
  3. STRING .*
  4. NAME .*
  5. [root@ZhangSiming logstash]# vim config/logstash.conf
  6. [root@ZhangSiming logstash]# cat config/logstash.conf
  7. input {
  8. stdin {
  9. }
  10. }
  11. filter {
  12. grok {
  13. patterns_dir => "/usr/local/logstash/patterns"
  14. match => [
  15. "message",'%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\"%{NUMBER:num} %{NUMBER:size}.*"%{STRING:nouse}"',
  16. "message",'%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\" %{NUMBER:num} %{NUMBER:size}.*&%{NAME:name}&'
  17. #如果是上面这个格式,就走上面的message字段匹配,如果是下面这个格式,就走下面的message字段匹配
  18. #注意message后面不是=>,改为了","
  19. #注意上下两行之间第一行结尾要用","
  20. #注意match =>后面的{}改为了[]
  21. ]
  22. }
  23. }
  24. output {
  25. stdout {
  26. codec => rubydebug
  27. }
  28. }
  1. #Logstash根据不同的日志内容,匹配不同的message,提取不同的字段
  2. 127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] "GET /index.html HTTP/1.1" 200 612"welcome to yunjisuan"
  3. {
  4. "num" => "200",
  5. "client" => "127.0.0.1",
  6. "message" => "127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] \"GET /index.html HTTP/1.1\" 200 612\"welcome to yunjisuan\"",
  7. "size" => "612",
  8. "method" => "GET",
  9. "nouse" => "welcome to yunjisuan",
  10. "host" => "ZhangSiming",
  11. "request" => "/index.html",
  12. "@timestamp" => 2019-03-10T10:27:15.418Z,
  13. "@version" => "1"
  14. }
  15. 127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] "GET /index.html HTTP/1.1" 200 612&zhangsiming&
  16. {
  17. "num" => "200",
  18. "client" => "127.0.0.1",
  19. "message" => "127.0.0.1 - - [10/Mar/2019:17:23:53 +0800] \"GET /index.html HTTP/1.1\" 200 612&zhangsiming&",
  20. "size" => "612",
  21. "method" => "GET",
  22. "name" => "zhangsiming",
  23. "host" => "ZhangSiming",
  24. "request" => "/index.html",
  25. "@timestamp" => 2019-03-10T10:27:50.860Z,
  26. "@version" => "1"
  27. }

2.6Logstash-Filter-geoip插件

geoip插件可以对IP的来源进行分析,并通过Kibana的地图功能形象的显示出来。

  1. #下载geoip插件包并移动到/usr/local/logstash
  2. [root@ZhangSiming ~]# wget http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
  3. --2019-03-10 18:39:19-- http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
  4. Resolving geolite.maxmind.com (geolite.maxmind.com)... 104.17.201.89, 104.17.200.89, 2606:4700::6811:c959, ...
  5. Connecting to geolite.maxmind.com (geolite.maxmind.com)|104.17.201.89|:80... connected.
  6. HTTP request sent, awaiting response... 302 Found
  7. Location: http://120.52.51.14/geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz [following]
  8. --2019-03-10 18:39:20-- http://120.52.51.14/geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
  9. Connecting to 120.52.51.14:80... connected.
  10. HTTP request sent, awaiting response... 200 OK
  11. Length: 28513410 (27M) [application/gzip]
  12. Saving to: GeoLite2-City.tar.gz
  13. 100%[======================================>] 28,513,410 4.79MB/s in 5.6s
  14. 2019-03-10 18:39:26 (4.81 MB/s) - GeoLite2-City.tar.gz saved [28513410/28513410]
  15. [root@ZhangSiming ~]# tar xf GeoLite2-City.tar.gz
  16. [root@ZhangSiming ~]# ls
  17. aaa GeoLite2-City_20190305 logstash-6.2.3.tar.gz
  18. anaconda-ks.cfg GeoLite2-City.tar.gz nginx-1.10.2.tar.gz
  19. common_install.sh jdk-8u60-linux-x64.tar.gz
  20. [root@ZhangSiming ~]# cp GeoLite2-City_20190305/GeoLite2-City.mmdb /usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/
  21. #这个是geoip数据库默认存放位置,也可以任意指定在Logstash目录中,但是注意配置文件中对应的database需要指定正确
  22. #修改Logstash配置文件
  23. [root@ZhangSiming logstash]# vim config/logstash.conf
  24. [root@ZhangSiming logstash]# cat config/logstash.conf
  25. input {
  26. stdin {
  27. }
  28. }
  29. filter {
  30. grok {
  31. patterns_dir => "/usr/local/logstash/patterns"
  32. match => [
  33. "message",'%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\" %{NUMBER:num} %{NUMBER:size}.*"%{STRING:nouse}"',
  34. "message",'%{IP:client} .*\"%{WORD:method} %{URIPATHPARAM:request} HTTP/1.1\" %{NUMBER:num} %{NUMBER:size}.*&%{NAME:name}&'
  35. ]
  36. }
  37. geoip {
  38. source => "client"
  39. database => "/usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"
  40. #这里路径要对应上
  41. }
  42. }
  43. output {
  44. stdout {
  45. codec => rubydebug
  46. }
  47. }
  1. 119.147.146.189 - - [10/Mar/2019:17:23:53 +0800] "GET /index.html HTTP/1.1" 200 612 &zhangsiming&
  2. {
  3. "message" => "119.147.146.189 - - [10/Mar/2019:17:23:53 +0800] \"GET /index.html HTTP/1.1\" 200 612 &zhangsiming&",
  4. "method" => "GET",
  5. "num" => "200",
  6. "name" => "zhangsiming",
  7. "client" => "119.147.146.189",
  8. "request" => "/index.html",
  9. "@timestamp" => 2019-03-10T11:27:05.871Z,
  10. "host" => "ZhangSiming",
  11. "@version" => "1",
  12. "size" => "612",
  13. "geoip" => {
  14. "ip" => "119.147.146.189",
  15. "latitude" => 23.1167,
  16. "country_name" => "China",
  17. "continent_code" => "AS",
  18. "timezone" => "Asia/Shanghai",
  19. "country_code2" => "CN",
  20. "region_name" => "Guangdong",
  21. "country_code3" => "CN",
  22. "region_code" => "GD",
  23. "location" => {
  24. "lat" => 23.1167,
  25. "lon" => 113.25
  26. },
  27. "longitude" => 113.25
  28. }
  29. }
  30. #只有geoip数据库里面有的ip才会定位到

2.7Logstash-Outer-elasticsearch插件

将输出写入elasticsearch,方便kibana读取,还可以指定索引等...ELKStack平台都是Logstash过滤之后写入到Elasticsearch数据库中,在Kibana查看,这里就不多赘述了。

三、Kibana深入讲解

3.1Logstash配置文件修改

  1. [root@ZhangSiming logstash]# vim config/logstash.conf
  2. [root@ZhangSiming logstash]# cat config/logstash.conf
  3. input {
  4. file {
  5. path => ["/usr/local/nginx/logs/access.log"]
  6. type => "system"
  7. tags => ["nginx","test"]
  8. start_position => "beginning"
  9. }
  10. file {
  11. path => ["/var/log/messages"]
  12. type => "system"
  13. tags => ["syslog","test"]
  14. start_position => "beginning"
  15. }
  16. #来源数据为两个地方,且都放到type:system中,分别tag,并从开始的部分同步
  17. }
  18. filter {
  19. }
  20. output {
  21. if [type] == "system" {
  22. if [tags][0] == "nginx" {
  23. elasticsearch {
  24. hosts => ["http://192.168.17.139:9200","http://192.168.17.140:9200","http://192.168.17.141:9200"]
  25. index => "logstash-zhangsiming-nginx-%{+YYYY.MM.dd}"
  26. }
  27. stdout { codec => rubydebug }
  28. }
  29. else if [tags][0] == "syslog" {
  30. elasticsearch {
  31. hosts => ["http://192.168.17.139:9200","http://192.168.17.140:9200","http://192.168.17.141:9200"]
  32. index => "logstash-zhangsiming-syslog-%{+YYYY.MM.dd}"
  33. }
  34. stdout { codec => rubydebug }
  35. }
  36. }
  37. }
  38. #使用if语句进行判断,不同数据写入不同的index

3.2Kibana配置文件

  1. [root@ZhangSiming kibana]# sed -n '7p;21p' config/kibana.yml
  2. server.host: "127.0.0.1"
  3. elasticsearch.url: "http://192.168.17.139:9200"
  4. #Nginx对Kibana做访问限制,Elasticsearch只要连接其中一个节点即可访问这个cluster

如果elasticsearch里没有任何索引,那么kibana是都取不到的,这就是为什么我们一开始看数据的时候需要从KibanaWeb界面交互创建索引。如果已经存在,就不需要创建了。

3.3访问KibanaWeb界面

先要开启Elasticsearch-Cluster,再启动Logstash(启动较慢,需要耐心等待),最后启动Kibana。

image_1d5leo4uu1f561n4qo3v1cth1dkp9.png-89.4kB

image_1d5lepu5l1tlg11s12dqbpk10o4m.png-74.6kB

image_1d5leqlp61hh1gbu1jl5gann5913.png-77.4kB

image_1d5lerqvr1is9sa019vd57u19qm1g.png-71.5kB

image_1d5lesbf9260p1vjrp17fv1ues1t.png-74.8kB

image_1d5lf7i8p19d12ut1jmf1oup1f4g2a.png-102kB

image_1d5lf9rgngghk4b1bv61lb91nvf2n.png-80.9kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注