@hadoopMan
2017-03-04T14:26:14.000000Z
字数 3025
阅读 1067
spark
想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开
1,Streaming:是一种数据传输技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看到的图像十分平稳,而且用户在整个文件传送完成之前就可以开始在屏幕上浏览文件。
2,Streaming Compute
$ bin/hdfs dfs -mkdir -p streaming/inputhdfs$ bin/hdfs dfs -put wc.txt streaming/inputhdfs
bin/hdfs dfs -put /opt/datas/wc.input /user/hadoop/streaming/inputhdfs/1
读取程序
import org.apache.spark._import org.apache.spark.streaming._val ssc = new StreamingContext(sc, Seconds(5))val lines = ssc.textFileStream("/user/hadoop/streaming/inputhdfs")val words = lines.flatMap(_.split("\t"))val wordCounts = words.map((_,1)).reduceByKey(_ + _)wordCounts.printssc.start()ssc.awaitTermination()

:load /opt/modules/spark-1.3.0-bin-2.5.0/wordcount.scala
./make-distribution.sh --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver -Phive-0.13.1

编译之前先配置镜像及域名服务器,来提高下载速度,进而提高编译速度,用nodepad++打开/opt/compileHadoop/apache-maven-3.0.5/conf/setting.xml。(nodepad已经通过sftp链接到了机器)
<mirror><id>nexus-spring</id><mirrorOf>cdh.repo</mirrorOf><name>spring</name><url>http://repo.spring.io/repo/</url></mirror><mirror><id>nexus-spring2</id><mirrorOf>cdh.releases.repo</mirrorOf><name>spring2</name><url>http://repo.spring.io/repo/</url></mirror>
sudo vi /etc/resolv.conf添加内容:nameserver 8.8.8.8nameserver 8.8.4.4
import org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.flume._val ssc = new StreamingContext(sc, Seconds(5))// Create a flume streamval stream = FlumeUtils.createStream(ssc, "miaodonghua.host", 9999)stream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()ssc.awaitTermination()
# The configuration file needs to define the sources,# the channels and the sinks.## define agenta2.sources = r2a2.channels = c2a2.sinks = k2## define sourcesa2.sources.r2.type = execa2.sources.r2.command = tail -f /opt/datas/spark-flume/wctotal.loga2.sources.r2.shell = /bin/bash -c## define channelsa2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100## define sinksa2.sinks.k2.type = avroa2.sinks.k2.hostname = miaodonghua.hosta2.sinks.k2.port = 9999### bind the sources and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
bin/spark-shell \--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-flume_2.10-1.3.0.jar,/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/flume-ng-sdk-1.5.0-cdh5.3.6.jar,/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/flume-avro-source-1.5.0-cdh5.3.6.jar \--master local[2]
:load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/FlumeEventCount.scala
bin/flume-ng agent --conf conf --name a2 --conf-file conf/flume-spark-push.conf -Dflume.root.logger=DEBUG,console
echo hadoop spark shell >>wctotal.log
