[关闭]
@gekeshi 2016-04-30T15:00:38.000000Z 字数 2721 阅读 605

Spark本地模式部署与实例运行

Spark 本地模式


实验环境

安装Spark

  1. 解压
    此处解压到/home/
  2. 修改配置文件
    安装后,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,执行如下命令拷贝一个配置文件:
  1. cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑./conf/spark-env.sh(vim ./conf/spark-env.sh),在最后面加上如下内容:

  1. export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
  1. 检查安装是否成功
    进入spark目录下的bin/文件夹,执行spark-shell命令

scala

spark-shell
4.运行Wordcount实例
启动hdfs,在hdfs中新建test/目录,将spark目录下的README.md文件复制到test/下。在spark-shell中执行如下 scala语句:

  1. scala> val input = sc.textFile("hdfs://master:9000/user/spark/test/README.md"
  2. scala> val words = input.flatMap(x => x.split(" "))
  3. scala> val reslut = words.map(x => (x,1)).reduceByKey((x,y) => x+y)

以上语句的解释可以参考官方文档:

执行结果如下:
wordcount
5.运行pagerank实例

  1. Intellij IDEA中新建scala工程,在project
    structure中为src/目录下新建main/scala目录,设置为sources类型,并添加scala和spark-assembly-xxx-hadoopxxx.jar两个Library
  2. 新建test包,添加pageranktest对象文件,代码见PageRank code
  3. 打包程序
    在项目结构界面中选择"Artifacts",在右边操作界面选择绿色"+"号,选择添加JAR包的"From modules with dependencies"方式,出现如下界面,在该界面中选择主函数入口;
    点击菜单Build->Build Artifacts,弹出选择动作,选择Build或者Rebuild动作;
    复制打包文件到Spark根目录下;
  4. 通过spark-submit运行程序
    本地模式下提交程序的例子:
  1. ## Run application locally on 8 cores
  2. ./bin/spark-submit \
  3. --class org.apache.spark.examples.SparkPi \
  4. --master local[8] \
  5. /path/to/examples.jar \
  6. 100

将打包的PageRank程序提交

  1. spark-submit --master local[2] --class tset.pageranktest ~/spark-1.6.1/pagerank.jar

按照Rank值排序打印
PageRank

PageRank code

  1. package tset
  2. import org.apache.log4j.{Level, Logger}
  3. import org.apache.spark.{SparkContext, SparkConf}
  4. import org.apache.spark.graphx._
  5. import org.apache.spark.rdd.RDD
  6. object pageranktest {
  7. def main(args: Array[String]) {
  8. //屏蔽日志
  9. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  10. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  11. //设置运行环境
  12. val conf = new SparkConf().setAppName("PageRank").setMaster("local")
  13. val sc = new SparkContext(conf)
  14. //读入数据文件
  15. val articles: RDD[String] = sc.textFile("hdfs://master:9000/user/spark/test/graphx-wiki-vertices.txt")
  16. val links: RDD[String] = sc.textFile("hdfs://master:9000/user/spark/test/graphx-wiki-edges.txt")
  17. //装载顶点和边
  18. val vertices = articles.map { line =>
  19. val fields = line.split('\t')
  20. (fields(0).toLong, fields(1))
  21. }
  22. val edges = links.map { line =>
  23. val fields = line.split('\t')
  24. Edge(fields(0).toLong, fields(1).toLong, 0)
  25. }
  26. val graph = Graph(vertices, edges, "").persist()
  27. println("**********************************************************")
  28. println("PageRank计算,获取最有价值的数据")
  29. println("**********************************************************")
  30. val prGraph = graph.pageRank(0.001).cache()
  31. val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
  32. (v, title, rank) => (rank.getOrElse(0.0), title)
  33. }
  34. titleAndPrGraph.vertices.top(10) {
  35. Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
  36. }.foreach(t => println(t._2._2 + ": " + t._2._1))
  37. sc.stop()
  38. }
  39. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注