@Arslan6and6
2016-06-14T03:54:36.000000Z
字数 8614
阅读 614
第十章、大数据协作框架之Flume
---Flume实时抽取监控目录数据
作业描述:
针对实时数据抽取框架 Flume,掌握如下几点内容:
1) Flume 功能 、Agent 概念及 Agent 三个组成部分和常见件功能
2) 完成课程中的所有 Demo 案例
3) 理解 Flume 如何在企业项目中的使用及【实时抽取监控目录数据】案例编写与理解
4) 针对 HDFS Sink 中几点注意,进行测试,整理文档记录。
Flume 功能:
Flume是一个分布式的,可靠的,可用的,非常有效率的对大数据量的日志数据进行收集、聚集、移动信息的服务。Flume仅仅运行在linux环境下。
它是一个基于流式的数据的非常简单的(配置文件中更改Java环境变量就可以)、灵活的架构,它也是一个健壮的、容错的。它用一个简单的扩展数据模型用于在线实时应用分析。它的简单表现为:写个source、channel、sink,之后一条命令就能操作成功了。
Flume、kafka实时进行数据收集,spark、storm实时去处理,impala实时去查询。
Agent 概念及 Agent 三个组成部分和常见件功能
Flume-ng只有一个角色的节点:agent的角色,agent有source、channel、sink组成。
Source用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel
channel连接 sources 和 sinks ,这个有点像一个队列。
sink从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase
依照官网简单案例
复制conf下flume-conf.properties.template文件,命名为test-conf.properties.template
在test-conf.properties.template粘贴官网举例配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
根据官网示例,使用Telnet连接
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
所以需要安装Telnet
# yum -y install telnet
# yum -y install telnet telnet-server
依照示例执行命令
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
参数解释
-ng agent 运行agent实例
--conf conf 指定配置目录为conf
--conf-file example.conf 指定配置文件为example.conf
--name a1 自定义agent名称为a1
-Dflume.root.logger=INFO,console 指定输出logger信息在console显示
$ bin/flume-ng agent --conf conf --conf-file conf/test-conf.properties --name a1 -Dflume.root.logger=INFO,console
Info: Sourcing environment configuration script /opt/modules/flume-1.5.0-cdh5.3.6-bin/conf/flume-env.sh
+ exec /opt/modules/jdk1.7.0_67/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/modules/flume-1.5.0-cdh5.3.6-bin/conf:/opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/test-conf.properties --name a1
2016-05-31 21:03:38,803 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-05-31 21:03:38,812 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/test-conf.properties
2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: k1 Agent: a1
2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
2016-05-31 21:03:38,866 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
2016-05-31 21:03:38,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [a1]
2016-05-31 21:03:38,886 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-05-31 21:03:38,903 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2016-05-31 21:03:38,913 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel c1
2016-05-31 21:03:38,914 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type netcat
2016-05-31 21:03:38,943 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2016-05-31 21:03:38,947 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel c1 connected to [r1, k1]
2016-05-31 21:03:38,955 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@347db2f9 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2016-05-31 21:03:38,970 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
2016-05-31 21:03:39,044 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2016-05-31 21:03:39,044 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
2016-05-31 21:03:39,046 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
2016-05-31 21:03:39,047 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2016-05-31 21:03:39,048 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting
2016-05-31 21:03:39,109 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.5.130:9999]
$ telnet hadoop-senior.ibeifeng.com 9999
Trying 192.168.5.130...
Connected to hadoop-senior.ibeifeng.com.
在Hive目录下创建一个logs目录,人为新建hive.log,随意写入内容
在HDFS中创建一个接收日志目录,命名为flume
flume作为访问HDFS的客户端,所以需要将HDFSClinetJAR包放到Flume安装目录的lib目录下,所需的JAR包如下列表:
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/tools/lib/commons-configuration-1.6.jar
/opt/modules/hadoop-2.5.0-cdh5.3.6/share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar
cp hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp hadoop-common-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp commons-configuration-1.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
cp hadoop-auth-2.5.0-cdh5.3.6.jar /opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/
重新编写agent配置文件,命名为flume2hdfs-conf.properties
其中,agent对象重命名为a2,source对象重命名为r2,channel对象重命名为c2
source配置:
依照官网exec-source的示例
注:linux tail命令把某个档案文件的最后几行显示到终端上,假设该档案有更新,tail会自己主动刷新,确保你看到最新的档案内容。-f 该参数用于监视File文件增长。
配置shell环境为 /bin/bash 确保 tail -F 命令存在
channel配置:本例channel依然缓存于内存,配置不需更改
sink配置:
依照官网修改type hdfs.path 和 hdfs.fileType
内存中有10个事件就写入hdfs
加入writeFormat配置,writeFormat = text 可以使用 -text 命令读取
另:可以依照官网规定格式自定义HDFS文件夹
所以,为了便于管理HDFS文件,在 hdfs.path 中可以按年月日创建分级目录
配置HDFS文件回滚参数
设置2级目录,按小时进行切割
回滚条件
使用本地时间戳
设置副本数量
在HDFS创建接收文件夹
执行 bin/flume-ng agent 命令,并保持状态
$ bin/flume-ng agent --conf conf --conf-file conf/flume2hdfs-conf.properties --name a2
Info: Sourcing environment configuration script /opt/modules/flume-1.5.0-cdh5.3.6-bin/conf/flume-env.sh
+ exec /opt/modules/jdk1.7.0_67/bin/java -Xmx20m -cp '/opt/modules/flume-1.5.0-cdh5.3.6-bin/conf:/opt/modules/flume-1.5.0-cdh5.3.6-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/flume2hdfs-conf.properties --name a2
[beifeng@hadoop-senior logs]$ pwd
/opt/modules/hive-0.13.1-cdh5.3.6/logs
[beifeng@hadoop-senior logs]$ ll
total 0
[beifeng@hadoop-senior logs]$ echo "aoaoaoao" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
HDFS中已按照本地时间在指定接收目录下生成日期目录及文件,并按时间戳命名 tmp 文件
在经过一定时间后没有数据持续写入hive.log文件,则HDFS中该文件夹被删除.tmp后缀重命名
在持续分次写入后前一次写入产生的tmp文件立即删除 .tmp 后缀。最后一次写入生成文件后缀为 .tmp ,带经过一定时间后该文件自动删除 .tmp 后缀重命名。
[beifeng@hadoop-senior logs]$ echo "king" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
[beifeng@hadoop-senior logs]$ echo "mini" >> /opt/modules/hive-0.13.1-cdh5.3.6/logs/hive.log
配置agent文件
与上例相比需要更改配置如下:
1.sources 参数
2.缓存参数
cp hive.log hive2.log
cp hive.log hive.log.tmp
pwd
/opt/modules/hive-0.13.1-cdh5.3.6/logs
按配置文件创建缓存目录
执行监听命令
bin/flume-ng agent --conf conf --conf-file conf/flume-dir.conf.properties --name a3
查看源文件夹, .tmp文件被排除在抽取范围以外,未被以 .COMPLETED 重命名
查看HDFS,已生成tmp文件,等待回滚完成。