[关闭]
@hadoopMan 2017-03-04T14:26:14.000000Z 字数 3025 阅读 870

flume与Streaming集成

spark


想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开

1,spark streaming简介

1,Streaming:是一种数据传输技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看到的图像十分平稳,而且用户在整个文件传送完成之前就可以开始在屏幕上浏览文件。
2,Streaming Compute

2,读取hdfs上文件

  1. $ bin/hdfs dfs -mkdir -p streaming/inputhdfs
  2. $ bin/hdfs dfs -put wc.txt streaming/inputhdfs
  1. bin/hdfs dfs -put /opt/datas/wc.input /user/hadoop/streaming/inputhdfs/1

读取程序

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. val ssc = new StreamingContext(sc, Seconds(5))
  4. val lines = ssc.textFileStream("/user/hadoop/streaming/inputhdfs")
  5. val words = lines.flatMap(_.split("\t"))
  6. val wordCounts = words.map((_,1)).reduceByKey(_ + _)
  7. wordCounts.print
  8. ssc.start()
  9. ssc.awaitTermination()

spark.png-28.9kB

  1. :load /opt/modules/spark-1.3.0-bin-2.5.0/wordcount.scala

3,与flume集成

1,编译2.5.0-cdh5.3.6

  1. ./make-distribution.sh --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Phive-0.13.1

编译成功.png-53.2kB

2,修改mvn镜像源

编译之前先配置镜像及域名服务器,来提高下载速度,进而提高编译速度,用nodepad++打开/opt/compileHadoop/apache-maven-3.0.5/conf/setting.xml。(nodepad已经通过sftp链接到了机器)

  1. <mirror>
  2. <id>nexus-spring</id>
  3. <mirrorOf>cdh.repo</mirrorOf>
  4. <name>spring</name>
  5. <url>http://repo.spring.io/repo/</url>
  6. </mirror>
  7. <mirror>
  8. <id>nexus-spring2</id>
  9. <mirrorOf>cdh.releases.repo</mirrorOf>
  10. <name>spring2</name>
  11. <url>http://repo.spring.io/repo/</url>
  12. </mirror>

3,配置域名解析服务器

  1. sudo vi /etc/resolv.conf
  2. 添加内容:
  3. nameserver 8.8.8.8
  4. nameserver 8.8.4.4

4,测试集群

1,FlumeEventCount.scala

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.flume._
  4. val ssc = new StreamingContext(sc, Seconds(5))
  5. // Create a flume stream
  6. val stream = FlumeUtils.createStream(ssc, "miaodonghua.host", 9999)
  7. stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
  8. ssc.start()
  9. ssc.awaitTermination()

2,编写flume-spark-push.conf

  1. # The configuration file needs to define the sources,
  2. # the channels and the sinks.
  3. ## define agent
  4. a2.sources = r2
  5. a2.channels = c2
  6. a2.sinks = k2
  7. ## define sources
  8. a2.sources.r2.type = exec
  9. a2.sources.r2.command = tail -f /opt/datas/spark-flume/wctotal.log
  10. a2.sources.r2.shell = /bin/bash -c
  11. ## define channels
  12. a2.channels.c2.type = memory
  13. a2.channels.c2.capacity = 1000
  14. a2.channels.c2.transactionCapacity = 100
  15. ## define sinks
  16. a2.sinks.k2.type = avro
  17. a2.sinks.k2.hostname = miaodonghua.host
  18. a2.sinks.k2.port = 9999
  19. ### bind the sources and sink to the channel
  20. a2.sources.r2.channels = c2
  21. a2.sinks.k2.channel = c2

3,启动spark-shell

  1. bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/flume-ng-sdk-1.5.0-cdh5.3.6.jar,/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/flume-avro-source-1.5.0-cdh5.3.6.jar \
  3. --master local[2]

4,执行脚本

  1. :load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/FlumeEventCount.scala

5,flume指令的运行

  1. bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-spark-push.conf -Dflume.root.logger=DEBUG,console

6,往wctotal.log追加一条信息

  1. echo hadoop spark shell >>wctotal.log

集成成功.png-29.1kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注