[关闭]
@EVA001 2017-11-03T09:48:12.000000Z 字数 3941 阅读 291

2017-3-16 使用flume完成数据的接收

有道云笔记


使用flume完成数据的接收
场景:source是通过tcp发送,chnnel处理过滤字段,sink存在集群中

适合①[注意,syslog需要特定环境,也可用telnet发送数据]

  1. source[syslogtcp],sink[hdfs]
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = syslogtcp
  7. a1.sources.r1.port = 12345
  8. a1.sources.r1.host =hadoop01
  9. a1.sources.r1.channels = c1
  10. # Describe the sink
  11. a1.sinks.k1.type = hdfs
  12. a1.sinks.k1.channel = c1
  13. ###HDFS的数目路径
  14. a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume
  15. a1.sinks.k1.hdfs.filePrefix = Syslog
  16. a1.sinks.k1.hdfs.round = true
  17. a1.sinks.k1.hdfs.roundValue = 1
  18. a1.sinks.k1.hdfs.roundUnit = minute
  19. # Use a channel which buffers events in memory
  20. a1.channels.c1.type = memory
  21. a1.channels.c1.capacity = 1000
  22. a1.channels.c1.transactionCapacity = 100
  23. # Bind the source and sink to the channel
  24. a1.sources.r1.channels = c1
  25. a1.sinks.k1.channel = c1
  1. [hadoop@hadoop01 flume]$ start-all.sh
  2. [hadoop@hadoop01 flume]$ hadoop fs -mkdir flume
  3. [hadoop@hadoop01 flume]$ hadoop fs -ls
  4. drwxr-xr-x - hadoop supergroup 0 2017-03-12 17:14 flume
  5. 接收端:bin/flume-ng agent --conf conf --conf-file conf/syslog.conf --name a1 -Dflume.root.logger=INFO,console
  6. 发送端:telnet hadoop01 12345,,,
  7. 结果:
  8. [hadoop@hadoop01 flume]$ hadoop fs -ls /flume //注意在hadoop下面写文件查看时,文件夹要加“/”
  9. Found 13 items
  10. -rw-r--r-- 3 hadoop supergroup 177 2017-03-12 18:09 /flume/My_netcat_log.1489313346930
  11. -rw-r--r-- 3 hadoop supergroup 224 2017-03-12 18:16 /flume/My_netcat_log.1489313794747
  12. -rw-r--r-- 3 hadoop supergroup 185 2017-03-12 17:21 /flume/Syslog.1489310474526
  13. -rw-r--r-- 3 hadoop supergroup 149 2017-03-12 17:21 /flume/Syslog.1489310474527
  14. [hadoop@hadoop01 flume]$ hadoop fs -ls flume //没有“/”会看不到!!!!
  15. [hadoop@hadoop01 flume]$
  16. [hadoop@hadoop01 flume]$

适合②[使用telnet来发送数据]

  1. source[netcat],sink[hdfs]
  2. # Describe/configure the source
  3. a1.sources.r1.type = netcat
  4. a1.sources.r1.port = 12321
  5. a1.sources.r1.bind = hadoop01
  6. a1.sources.r1.channels = c1
  7. # Describe the sink
  8. a1.sinks.k1.type = hdfs
  9. a1.sinks.k1.channel = c1
  10. ###HDFS的数目路径
  11. a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume
  12. a1.sinks.k1.hdfs.filePrefix = My_netcat_log
  13. a1.sinks.k1.hdfs.round = true
  14. a1.sinks.k1.hdfs.roundValue = 1
  15. a1.sinks.k1.hdfs.roundUnit = minute
  16. # Use a channel which buffers events in memory
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100
  20. # Bind the source and sink to the channel
  21. a1.sources.r1.channels = c1
  22. a1.sinks.k1.channel = c1
  23. 接收端:bin/flume-ng agent --conf conf --conf-file conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console
  24. 发送端:telnet hadoop01 12345,,,
  25. 结果:
  26. [hadoop@hadoop01 flume]$ hadoop fs -ls /flume
  27. Found 13 items
  28. -rw-r--r-- 3 hadoop supergroup 177 2017-03-12 18:09 /flume/My_netcat_log.1489313346930
  29. -rw-r--r-- 3 hadoop supergroup 224 2017-03-12 18:16 /flume/My_netcat_log.1489313794747
  30. [hadoop@hadoop01 flume]$ hadoop fs -ls flume
  31. [hadoop@hadoop01 flume]$
  32. [hadoop@hadoop01 flume]$

适合③[使用curl来发送数据]

  1. source[http],sink[hdfs]
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = http
  7. a1.sources.r1.port = 50000
  8. a1.sources.r1.bind = hadoop01
  9. a1.sources.r1.channels = c1
  10. # Describe the sink
  11. a1.sinks.k1.type = hdfs
  12. a1.sinks.k1.channel = c1
  13. ###HDFS
  14. a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume
  15. a1.sinks.k1.hdfs.filePrefix = Http_log
  16. a1.sinks.k1.hdfs.round = true
  17. a1.sinks.k1.hdfs.roundValue = 1
  18. a1.sinks.k1.hdfs.roundUnit = minute
  19. # Use a channel which buffers events in memory
  20. a1.channels.c1.type = memory
  21. a1.channels.c1.capacity = 1000
  22. a1.channels.c1.transactionCapacity = 100
  23. # Bind the source and sink to the channel
  24. a1.sources.r1.channels = c1
  25. a1.sinks.k1.channel = c1
  26. 接收端:bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console
  27. 发送端:
  28. [hadoop@hadoop01 flume]$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' http://hadoop01:50000
  29. [hadoop@hadoop01 flume]$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"asdascfascas"}]' http://hadoop01:50000
  30. [hadoop@hadoop01 flume]$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"xxxxxxxxxxx"}]' http://hadoop01:50000
  31. 结果:
  32. [hadoop@hadoop01 flume]$ hadoop fs -ls /flume
  33. Found 16 items
  34. -rw-r--r-- 3 hadoop supergroup 145 2017-03-12 18:49 /flume/Http_log.1489315734229
  35. -rw-r--r-- 3 hadoop supergroup 147 2017-03-12 18:49 /flume/Http_log.1489315785602
  36. -rw-r--r-- 3 hadoop supergroup 161 2017-03-12 18:49 /flume/Http_log.1489315785603
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注