[关闭]
@Arslan6and6 2016-06-14T03:54:36.000000Z 字数 8614 阅读 614

【作业二十】殷杰

第十章、大数据协作框架之Flume

---Flume实时抽取监控目录数据

作业描述:

针对实时数据抽取框架 Flume,掌握如下几点内容:

1) Flume 功能 、Agent 概念及 Agent 三个组成部分和常见件功能

2) 完成课程中的所有 Demo 案例

3) 理解 Flume 如何在企业项目中的使用及【实时抽取监控目录数据】案例编写与理解

4) 针对 HDFS Sink 中几点注意,进行测试,整理文档记录。

1 ) Flume 功能 、Agent 概念及 Agent 三个组成部分和常见件功能

Flume 功能:
Flume是一个分布式的,可靠的,可用的,非常有效率的对大数据量的日志数据进行收集、聚集、移动信息的服务。Flume仅仅运行在linux环境下。
它是一个基于流式的数据的非常简单的(配置文件中更改Java环境变量就可以)、灵活的架构,它也是一个健壮的、容错的。它用一个简单的扩展数据模型用于在线实时应用分析。它的简单表现为:写个source、channel、sink,之后一条命令就能操作成功了。
Flume、kafka实时进行数据收集,spark、storm实时去处理,impala实时去查询。


Agent 概念及 Agent 三个组成部分和常见件功能
Flume-ng只有一个角色的节点:agent的角色,agent有source、channel、sink组成。
Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel
channel连接 sources 和 sinks ,这个有点像一个队列。
sink从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase

2)完成课程中的所有 Demo 案例

依照官网简单案例
复制conf下flume-conf.properties.template文件,命名为test-conf.properties.template
在test-conf.properties.template粘贴官网举例配置

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = netcat
  7. a1.sources.r1.bind = localhost
  8. a1.sources.r1.port = 44444
  9. # Describe the sink
  10. a1.sinks.k1.type = logger
  11. # Use a channel which buffers events in memory
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 1000
  14. a1.channels.c1.transactionCapacity = 100
  15. # Bind the source and sink to the channel
  16. a1.sources.r1.channels = c1
  17. a1.sinks.k1.channel = c1

01.jpg-56.2kB
根据官网示例,使用Telnet连接

  1. $ telnet localhost 44444
  2. Trying 127.0.0.1...
  3. Connected to localhost.localdomain (127.0.0.1).
  4. Escape character is '^]'.
  5. Hello world! <ENTER>
  6. OK

所以需要安装Telnet

  1. # yum -y install telnet
  2. # yum -y install telnet telnet-server

依照示例执行命令
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
参数解释
-ng agent 运行agent实例
--conf conf 指定配置目录为conf
--conf-file example.conf 指定配置文件为example.conf
--name a1 自定义agent名称为a1
-Dflume.root.logger=INFO,console 指定输出logger信息在console显示

  1. $ bin/flume-ng agent --conf conf --conf-file conf/test-conf.properties --name a1 -Dflume.root.logger=INFO,console
  2. Info: Sourcing environment configuration script /opt/modules/flume-1.5.0-cdh5.3.6-bin/conf/flume-env.sh
  3. + exec /opt/modules/jdk1.7.0_67/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/modules/flume-1.5.0-cdh5.3.6-bin/conf:/opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/test-conf.properties --name a1
  4. 2016-05-31 21:03:38,803 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
  5. 2016-05-31 21:03:38,812 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/test-conf.properties
  6. 2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: k1 Agent: a1
  7. 2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
  8. 2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
  9. 2016-05-31 21:03:38,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [a1]
  10. 2016-05-31 21:03:38,886 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
  11. 2016-05-31 21:03:38,903 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
  12. 2016-05-31 21:03:38,913 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel c1
  13. 2016-05-31 21:03:38,914 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat
  14. 2016-05-31 21:03:38,943 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
  15. 2016-05-31 21:03:38,947 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel c1 connected to [r1, k1]
  16. 2016-05-31 21:03:38,955 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@347db2f9 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
  17. 2016-05-31 21:03:38,970 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
  18. 2016-05-31 21:03:39,044 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  19. 2016-05-31 21:03:39,044 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
  20. 2016-05-31 21:03:39,046 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
  21. 2016-05-31 21:03:39,047 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
  22. 2016-05-31 21:03:39,048 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
  23. 2016-05-31 21:03:39,109 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.5.130:9999]
  1. $ telnet hadoop-senior.ibeifeng.com 9999
  2. Trying 192.168.5.130...
  3. Connected to hadoop-senior.ibeifeng.com.

案例分析:内存缓存实时收集文件数据至HDFS

准备阶段

在Hive目录下创建一个logs目录,人为新建hive.log,随意写入内容
在HDFS中创建一个接收日志目录,命名为flume
flume作为访问HDFS的客户端,所以需要将HDFSClinetJAR包放到Flume安装目录的lib目录下,所需的JAR包如下列表:
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/tools/lib/commons-configuration-1.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar

cp hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp hadoop-common-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp commons-configuration-1.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp hadoop-auth-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/

重新编写agent配置文件,命名为flume2hdfs-conf.properties
其中,agent对象重命名为a2,source对象重命名为r2,channel对象重命名为c2
source配置:
依照官网exec-source的示例
image_1ak5c5m931o6c1isd126sbk8l0m9.png-6.2kB
注:linux tail命令把某个档案文件的最后几行显示到终端上,假设该档案有更新,tail会自己主动刷新,确保你看到最新的档案内容。-f 该参数用于监视File文件增长。
配置shell环境为 /bin/bash 确保 tail -F 命令存在
image_1ak5lvo8jh7qgs4bdio8e1mo39a.png-15kB
channel配置:本例channel依然缓存于内存,配置不需更改
image_1ak5dh1vdbmh10po1avr7bln113.png-13.2kB
sink配置:
依照官网修改type hdfs.path 和 hdfs.fileType
image_1ak5e3uds1mrcgmb1e051i7jgro1g.png-6.1kB

image_1ak5e5089f5frhd1dg711dv1p151t.png-10kB

image_1ak5ees645fv103n1gpe1ms84g2q.png-5.5kB

内存中有10个事件就写入hdfs
image_1ak5em5hu1ol8v1g1enq704l9837.png-2.2kB
image_1ak5epn601cbg9831qgqu8142n3k.png-3.2kB

加入writeFormat配置,writeFormat = text 可以使用 -text 命令读取
image_1ak5evafjlor6b1s9g2a570041.png-2.5kB
image_1ak5f1cib1s49j8k1m645uq5qf4e.png-3.3kB

另:可以依照官网规定格式自定义HDFS文件夹
image_1ak5ffo30uk1pkipmusskal4r.png-25.6kB
所以,为了便于管理HDFS文件,在 hdfs.path 中可以按年月日创建分级目录
image_1ak5fmb2ts0919gl1025ru41vg58.png-5.6kB

配置HDFS文件回滚参数
image_1ak5g1q3n1v541llenli1nfp1lro5l.png-8.4kB
设置2级目录,按小时进行切割
image_1ak5g3672n554io2qc1rh118dm62.png-8.8kB
回滚条件
image_1ak5gpc4s11hbqbvcrq1nev19396f.png-7.5kB
image_1ak5gqsqj13nm1f1d15on1c0l144m6s.png-9.7kB

使用本地时间戳
image_1ak5gvd0t4efk5j18ufmci1n1q79.png-3.8kB

设置副本数量
image_1ak5h3mkr1ma4ktdvgr1rocq277m.png-4kB
image_1ak5h4eqq1g982praju1e9q1h8983.png-3.5kB

测试运行:

在HDFS创建接收文件夹
image_1ak5mjjj61fabn1012ekijg1gga9n.png-13.8kB

执行 bin/flume-ng agent 命令,并保持状态

  1. $ bin/flume-ng agent --conf conf --conf-file conf/flume2hdfs-conf.properties --name a2
  2. Info: Sourcing environment configuration script /opt/modules/flume-1.5.0-cdh5.3.6-bin/conf/flume-env.sh
  3. + exec /opt/modules/jdk1.7.0_67/bin/java -Xmx20m -cp '/opt/modules/flume-1.5.0-cdh5.3.6-bin/conf:/opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/flume2hdfs-conf.properties --name a2
  1. [beifeng@hadoop-senior logs]$ pwd
  2. /opt/modules/hive-0.13.1-cdh5.3.6/logs
  3. [beifeng@hadoop-senior logs]$ ll
  4. total 0
  5. [beifeng@hadoop-senior logs]$ echo "aoaoaoao" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log

HDFS中已按照本地时间在指定接收目录下生成日期目录及文件,并按时间戳命名 tmp 文件
image_1ak5mm0jl1v5j1tq8v731vsh1noma4.png-17.4kB
在经过一定时间后没有数据持续写入hive.log文件,则HDFS中该文件夹被删除.tmp后缀重命名
image_1ak5n5uvln9jui73h616fo1v3jah.png-17.5kB
在持续分次写入后前一次写入产生的tmp文件立即删除 .tmp 后缀。最后一次写入生成文件后缀为 .tmp ,带经过一定时间后该文件自动删除 .tmp 后缀重命名。

  1. [beifeng@hadoop-senior logs]$ echo "king" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
  2. [beifeng@hadoop-senior logs]$ echo "mini" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log

image_1ak5nhe281r91l5g1h6s722pk5au.png-22kB

案例分析2:file缓存实时收集目录数据至HDFS

配置agent文件
与上例相比需要更改配置如下:
1.sources 参数
image_1ak5vkmm3qqi1642cb2bghvn4bo.png-15kB
2.缓存参数
image_1ak5vlsfa10kl75cvellr9ptsc5.png-16.9kB

  1. cp hive.log hive2.log
  2. cp hive.log hive.log.tmp
  3. pwd
  4. /opt/modules/hive-0.13.1-cdh5.3.6/logs

按配置文件创建缓存目录
执行监听命令

  1. bin/flume-ng agent --conf conf --conf-file conf/flume-dir.conf.properties --name a3

查看源文件夹, .tmp文件被排除在抽取范围以外,未被以 .COMPLETED 重命名
image_1ak606ou5g2115edve3117vi4oci.png-17.8kB
查看HDFS,已生成tmp文件,等待回滚完成。
image_1ak607q0j1m7j1p6h1ufa1ovckncv.png-24.9kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注