@gekeshi
2016-04-30T15:00:38.000000Z
字数 2721
阅读 605
Spark 本地模式
/home/下./conf/spark-env.sh 中修改 Spark 的 Classpath,执行如下命令拷贝一个配置文件:
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
编辑./conf/spark-env.sh(vim ./conf/spark-env.sh),在最后面加上如下内容:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
spark-shell命令
4.运行Wordcount实例
启动hdfs,在hdfs中新建test/目录,将spark目录下的README.md文件复制到test/下。在spark-shell中执行如下 scala语句:
scala> val input = sc.textFile("hdfs://master:9000/user/spark/test/README.md")scala> val words = input.flatMap(x => x.split(" "))scala> val reslut = words.map(x => (x,1)).reduceByKey((x,y) => x+y)
以上语句的解释可以参考官方文档:
执行结果如下:
5.运行pagerank实例
## Run application locally on 8 cores./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master local[8] \/path/to/examples.jar \100
将打包的PageRank程序提交
spark-submit --master local[2] --class tset.pageranktest ~/spark-1.6.1/pagerank.jar
按照Rank值排序打印

package tsetimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.graphx._import org.apache.spark.rdd.RDDobject pageranktest {def main(args: Array[String]) {//屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//设置运行环境val conf = new SparkConf().setAppName("PageRank").setMaster("local")val sc = new SparkContext(conf)//读入数据文件val articles: RDD[String] = sc.textFile("hdfs://master:9000/user/spark/test/graphx-wiki-vertices.txt")val links: RDD[String] = sc.textFile("hdfs://master:9000/user/spark/test/graphx-wiki-edges.txt")//装载顶点和边val vertices = articles.map { line =>val fields = line.split('\t')(fields(0).toLong, fields(1))}val edges = links.map { line =>val fields = line.split('\t')Edge(fields(0).toLong, fields(1).toLong, 0)}val graph = Graph(vertices, edges, "").persist()println("**********************************************************")println("PageRank计算,获取最有价值的数据")println("**********************************************************")val prGraph = graph.pageRank(0.001).cache()val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {(v, title, rank) => (rank.getOrElse(0.0), title)}titleAndPrGraph.vertices.top(10) {Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)}.foreach(t => println(t._2._2 + ": " + t._2._1))sc.stop()}}