@hadoopMan
2017-03-04T14:28:59.000000Z
字数 5988
阅读 1866
flume
想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开
针对实时数据抽取框架Flume,掌握如下几点内容:
1) Flume 功能、Agent概念及Agent三个组成部分和常见组件功能
2) 完成课程中的所有Demo案例
3) 理解Flume如何在企业项目中的使用及【实时抽取监控目录数据】案例编写与理解
flume只有一个角色agent,agent里都有三部分构成:source、channel和sink。就相当于source接收数据,通过channel传输数据,sink把数据写到下一端。这就完了,就这么简单。其中source有很多种可以选择,channel有很多种可以选择,sink也同样有多种可以选择,并且都支持自定义。同时,agent还支持选择器,就是一个source支持多个channel和多个sink,这样就完成了数据的分发。
Event是flume数据传输的基本单元
flume以时间的形式将数据从源头传输到目的地
Event由可选的header和载有数据的一个byte array构成:
1,载有数据对flume是不透明的
2,header是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
flume-ng-1.5.0-cdh5.3.6.tar.gz
造flume的配置文件中添加java路径:
export JAVA_HOME=/opt/modules/jdk1.7.0_67
# The configuration file needs to define the sources,
# the channels and the sinks.
## define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
## define sources
a1.sources.s1.channels = c1
a1.sources.s1.type = netcat
a1.sources.s1.bind = miaodonghua1.host
a1.sources.s1.port = 5555
## define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## define sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
sudo yum install telnet
bin/flume-ng agent \
--conf conf \
--name a1 \
--conf-file conf/test.conf \
-Dflume.root.logger=INFO,console
参数说明:
-c或者--conf后跟配置目录
-f或者--conf-file 后跟配置文件
-n或者--name指定agent的名称
telnet miaodonghua1.host 5555
hive.log.threshold=ALL
hive.root.logger=WARN,DRFA
hive.log.dir=/opt/cdh2.3.6/hive-0.13.1-cdh5.3.6/logs/
hive.log.file=hive.log
# The configuration file needs to define the sources,
# the channels and the sinks.
## define agent
a2.sources = r2
a2.channels = c2
a2.sinks = k2
## define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -f /opt/cdh2.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
## define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
## define sinks
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://miaodonghua1.host:8020/user/hadoop/flume/hive-logs/
a2.sinks.k2.hdfs.filePrefix=cmcc
a2.sinks.k2.hdfs.minBlockReplicas=1
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10
a2.sinks.k2.hdfs.rollInterval=0
a2.sinks.k2.hdfs.rollSize= 131072
a2.sinks.k2.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=6000
### bind the sources and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
参数详解:
a2.sinks.k2.hdfs.filePrefix=cmcc:定义文件前缀
a2.sinks.k2.hdfs.minBlockReplicas=1:副本数,默认是读取hdfs的副本数
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.batchSize = 10
a2.sinks.k2.hdfs.rollInterval=0 :滚动创建文件的时间间隔,也就是一定时间间隔决定是否创建新的文件,0表时不依据时间来滚动创建文件,秒为单位。
a2.sinks.k2.hdfs.rollSize= 131072:配置按文件大小来滚动创建新的文件,0表时不按文件大小来创建,单位是字节。
a2.sinks.k2.hdfs.rollCount=0:hdfs有多少条events消息时新建文件,0不基于消息个数
cp share/hadoop/hdfs/hadoop-hdfs-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
cp share/hadoop/tools/lib/hadoop-auth-2.5.0-cdh5.3.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/lib/
bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume_tail.conf -Dflume.root.logger=INFO,console
1,启动hive
bin/hive
2,使用shell命令查看日志
tail -f hive.log
3,flume在控制台的输出
4,在hdfs的存储
1,在使用exec来监听数据源虽然实时性较高,但是可靠性较差,当source程序运行异常或者Linux命令中断都会造成数据丢失,在恢复正常运行之前数据的完整性无法得到保障。
2,Spooling Directory Paths通过监听某个目录下的新增文件,并将文件的内容读取出来,实现日志信息的收集。实际生产中会结合log4j来使用。被传输结束的文件会修改后缀名,添加.completed后缀(可修改)。
·/app/logs/20151212
xx.log.tmp :应用正在向此文件写数据,设定该文件大小为某个值128MB时,会重新生成一个文件
yy.log : 表示一个完整的日志文件,flume可以抽取其中的数据
zz.log.completed:表是flume已经抽取完数据文件
# The configuration file needs to define the sources,
# the channels and the sinks.
## define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
## define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/cdh5.3.6/flume-1.5.0-cdh5.3.6-bin/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
a3.sources.r3.fileSuffix = .delete
## define channels
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
## define sinks
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs/
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
### bind the sources and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf --name a3 --conf-file conf/flume-app.conf -Dflume.root.logger=INFO,console
1,监控目录
日志目录,抽取完整的日志文件,写的日志文件不抽取。
2,使用FileChannel
本地文件系统缓冲,比内存安全性高。
3,数据存储HDFS
存储对应hive表的目录或者hdfs目录。
# The configuration file needs to define the sources,
# the channels and the sinks.
## define agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
## define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/spoollogs
a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
a3.sources.r3.fileSuffix = .delete
## define channels
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/flume/checkpoint
a3.channels.c3.dataDirs = /opt/cdh2.3.6/flume-1.5.0-cdh5.3.6-bin/flume/data
## define sinks
a3.sinks.k3.type = hdfs
#a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
a3.sinks.k3.hdfs.useLocalTimeStamp=true
a3.sinks.k3.hdfs.path = hdfs://miaodonghua1.host/user/hadoop/flume/splogs/%y-%m-%d/
a3.sinks.k3.hdfs.filePrefix = events-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
### bind the sources and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
执行如下命令:
bin/flume-ng agent --conf conf --name a3 --conf-file conf/flume-app.conf -Dflume.root.logger=INFO,console
成功后: