[关闭]
@hadoopMan 2017-03-04T14:28:59.000000Z 字数 5988 阅读 1866

flume详解

flume

想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开

本节内容:

针对实时数据抽取框架Flume,掌握如下几点内容:
1) Flume 功能、Agent概念及Agent三个组成部分和常见组件功能
2) 完成课程中的所有Demo案例
3) 理解Flume如何在企业项目中的使用及【实时抽取监控目录数据】案例编写与理解

4) 针对HDFS Sink中几点注意,进行测试,整理记录

1,flume的相关概念

1.1 flume的角色

flume只有一个角色agent,agent里都有三部分构成:source、channel和sink。就相当于source接收数据,通过channel传输数据,sink把数据写到下一端。这就完了,就这么简单。其中source有很多种可以选择,channel有很多种可以选择,sink也同样有多种可以选择,并且都支持自定义。同时,agent还支持选择器,就是一个source支持多个channel和多个sink,这样就完成了数据的分发。
flume.png-23.5kB
flume汇集.png-51kB
flume分散.png-55.6kB

1.2 flume的event

Event是flume数据传输的基本单元
flume以时间的形式将数据从源头传输到目的地
Event由可选的header和载有数据的一个byte array构成:
1,载有数据对flume是不透明的
2,header是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
event.png-15.6kB

2,flume的相关配置

2.1,flume下载

flume-ng-1.5.0-cdh5.3.6.tar.gz

2.2,添加配置

造flume的配置文件中添加java路径:

  1. export JAVA_HOME=/opt/modules/jdk1.7.0_67

2.3,测试是否配置成功

2.3.1 编写一个agent

  1. # The configuration file needs to define the sources,
  2. # the channels and the sinks.
  3. ## define agent
  4. a1.sources = s1
  5. a1.channels = c1
  6. a1.sinks = k1
  7. ## define sources
  8. a1.sources.s1.channels = c1
  9. a1.sources.s1.type = netcat
  10. a1.sources.s1.bind = miaodonghua1.host
  11. a1.sources.s1.port = 5555
  12. ## define channels
  13. a1.channels.c1.type = memory
  14. a1.channels.c1.capacity = 1000
  15. a1.channels.c1.transactionCapacity = 100
  16. ## define sinks
  17. a1.sinks.k1.channel = c1
  18. a1.sinks.k1.type = logger

2.3.2 安装telnet

  1. sudo yum install telnet

2.3.3 启动flume

  1. bin/flume-ng agent \
  2. --conf conf \
  3. --name a1 \
  4. --conf-file conf/test.conf \
  5. -Dflume.root.logger=INFO,console
  1. 参数说明:
  2. -c或者--conf后跟配置目录
  3. -f或者--conf-file 后跟配置文件
  4. -n或者--name指定agent的名称

2.3.4 监控5555端口

  1. telnet miaodonghua1.host 5555

2.3.5 收发数据测试

telnet.png-5.4kB
receive.png-10.8kB

3,flume的agent常见配置

参考手册地址

4,flume收集hive的日志信息

4.1 在hive-log4j.properties中设置hive日志目录

  1. hive.log.threshold=ALL
  2. hive.root.logger=WARN,DRFA
  3. hive.log.dir=/opt/cdh2.3.6/hive-0.13.1-cdh5.3.6/logs/
  4. hive.log.file=hive.log

4.2 编写flume的agent

  1. # The configuration file needs to define the sources,
  2. # the channels and the sinks.
  3. ## define agent
  4. a2.sources = r2
  5. a2.channels = c2
  6. a2.sinks = k2
  7. ## define sources
  8. a2.sources.r2.type = exec
  9. a2.sources.r2.command = tail -f /opt/cdh2.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
  10. a2.sources.r2.shell = /bin/bash -c
  11. ## define channels
  12. a2.channels.c2.type = memory
  13. a2.channels.c2.capacity = 1000
  14. a2.channels.c2.transactionCapacity = 100
  15. ## define sinks
  16. a2.sinks.k2.type = hdfs
  17. a2.sinks.k2.hdfs.path = hdfs://miaodonghua1.host:8020/user/hadoop/flume/hive-logs/
  18. a2.sinks.k2.hdfs.filePrefix=cmcc
  19. a2.sinks.k2.hdfs.minBlockReplicas=1
  20. a2.sinks.k2.hdfs.fileType = DataStream
  21. a2.sinks.k2.hdfs.writeFormat = Text
  22. a2.sinks.k2.hdfs.batchSize = 10
  23. a2.sinks.k2.hdfs.rollInterval=0
  24. a2.sinks.k2.hdfs.rollSize= 131072
  25. a2.sinks.k2.hdfs.rollCount=0
  26. tier1.sinks.sink1.hdfs.idleTimeout=6000
  27. ### bind the sources and sink to the channel
  28. a2.sources.r2.channels = c2
  29. a2.sinks.k2.channel = c2

参数详解:

  1. a2.sinks.k2.hdfs.filePrefix=cmcc:定义文件前缀
  2. a2.sinks.k2.hdfs.minBlockReplicas=1:副本数,默认是读取hdfs的副本数
  3. a2.sinks.k2.hdfs.fileType = DataStream
  4. a2.sinks.k2.hdfs.writeFormat = Text
  5. a2.sinks.k2.hdfs.batchSize = 10
  6. a2.sinks.k2.hdfs.rollInterval=0 :滚动创建文件的时间间隔,也就是一定时间间隔决定是否创建新的文件,0表时不依据时间来滚动创建文件,秒为单位。
  7. a2.sinks.k2.hdfs.rollSize= 131072:配置按文件大小来滚动创建新的文件,0表时不按文件大小来创建,单位是字节。
  8. a2.sinks.k2.hdfs.rollCount=0:hdfs有多少条events消息时新建文件,0不基于消息个数

4.3 拷贝flume依赖的hadoop的jar包

  1. cp share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
  2. cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
  3. cp share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
  4. cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/

4.4 运行flume

  1. bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume_tail.conf -Dflume.root.logger=INFO,console

4.5 运行hive

1,启动hive

  1. bin/hive

2,使用shell命令查看日志

  1. tail -f hive.log

tail-f-hivelog.png-60kB
3,flume在控制台的输出
flume的输出.png-75.3kB
4,在hdfs的存储
flume的输出.png-75.3kB

4,flume的在企业中的应用

4.1 spooling Directory Source

1,在使用exec来监听数据源虽然实时性较高,但是可靠性较差,当source程序运行异常或者Linux命令中断都会造成数据丢失,在恢复正常运行之前数据的完整性无法得到保障。
2,Spooling Directory Paths通过监听某个目录下的新增文件,并将文件的内容读取出来,实现日志信息的收集。实际生产中会结合log4j来使用。被传输结束的文件会修改后缀名,添加.completed后缀(可修改)。

4.2 监控某热日志文件的目录

·/app/logs/20151212
xx.log.tmp :应用正在向此文件写数据,设定该文件大小为某个值128MB时,会重新生成一个文件
yy.log : 表示一个完整的日志文件,flume可以抽取其中的数据
zz.log.completed:表是flume已经抽取完数据文件

4.3 下面代码

  1. # The configuration file needs to define the sources,
  2. # the channels and the sinks.
  3. ## define agent
  4. a3.sources = r3
  5. a3.channels = c3
  6. a3.sinks = k3
  7. ## define sources
  8. a3.sources.r3.type = spooldir
  9. a3.sources.r3.spoolDir = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6-bin/spoollogs
  10. a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
  11. a3.sources.r3.fileSuffix = .delete
  12. ## define channels
  13. a3.channels.c3.type = memory
  14. a3.channels.c3.capacity = 1000
  15. a3.channels.c3.transactionCapacity = 100
  16. ## define sinks
  17. a3.sinks.k3.type = hdfs
  18. a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs/
  19. a3.sinks.k3.hdfs.fileType = DataStream
  20. a3.sinks.k3.hdfs.writeFormat = Text
  21. a3.sinks.k3.hdfs.batchSize = 10
  22. ### bind the sources and sink to the channel
  23. a3.sources.r3.channels = c3
  24. a3.sinks.k3.channel = c3

4.4 在flume主目录下执行下面命令:

  1. bin/flume-ng agent --conf conf --name a3 --conf-file conf/flume-app.conf -Dflume.root.logger=INFO,console

tmp未被识别.png-4kB
webapp导出flume.png-18.9kB

5,完成如下需求

5.1 完成如下需求:

1,监控目录
日志目录,抽取完整的日志文件,写的日志文件不抽取。
2,使用FileChannel
本地文件系统缓冲,比内存安全性高。
3,数据存储HDFS
存储对应hive表的目录或者hdfs目录。

5.2实现代码

  1. # The configuration file needs to define the sources,
  2. # the channels and the sinks.
  3. ## define agent
  4. a3.sources = r3
  5. a3.channels = c3
  6. a3.sinks = k3
  7. ## define sources
  8. a3.sources.r3.type = spooldir
  9. a3.sources.r3.spoolDir = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/spoollogs
  10. a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
  11. a3.sources.r3.fileSuffix = .delete
  12. ## define channels
  13. a3.channels.c3.type = file
  14. a3.channels.c3.checkpointDir = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/flume/checkpoint
  15. a3.channels.c3.dataDirs = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/flume/data
  16. ## define sinks
  17. a3.sinks.k3.type = hdfs
  18. #a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs
  19. a3.sinks.k3.hdfs.fileType = DataStream
  20. a3.sinks.k3.hdfs.writeFormat = Text
  21. a3.sinks.k3.hdfs.batchSize = 10
  22. a3.sinks.k3.hdfs.useLocalTimeStamp=true
  23. a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs/%y-%m-%d/
  24. a3.sinks.k3.hdfs.filePrefix = events-
  25. a3.sinks.k3.hdfs.round = true
  26. a3.sinks.k3.hdfs.roundValue = 1
  27. a3.sinks.k3.hdfs.roundUnit = hour
  28. ### bind the sources and sink to the channel
  29. a3.sources.r3.channels = c3
  30. a3.sinks.k3.channel = c3

执行如下命令:

  1. bin/flume-ng agent --conf conf --name a3 --conf-file conf/flume-app.conf -Dflume.root.logger=INFO,console

成功后:
按天分区.png-8.2kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注