[关闭]
@EVA001 2018-05-08T13:18:17.000000Z 字数 14348 阅读 485

GraphX具体功能的代码使用实例-Scala实现

未分类


前言

GraphX 为整个图计算流程提供了强大的支持,先前已经有若干篇文章先后介绍了GraphX的强大功能,在GraphX官方编程指南中,提供了部分简单易懂的示例代码,其为GraphX的使用提供了一个初步的认识,作为需要用GraphX来编码实现需求的读者来说是十分宝贵的资源。

本文利用一个初始示例代码,结合部分官方文档中的说明,对GraphX的部分功能方法进行了实践,在全部亲自运行通过后,对大部分代码添加了自己的理解和认识,并且在Pregel模型编程部分结合运行结果对其运行流程做了一定梳理,来意图理解其执行机制。

下面,是ben程序代码中使用到的主要程序部分,即定义出一个简单的图结构,并构造一个图Graph[VD,ED],对具体功能的实现均放置在代码的后半部分,主要包括一下几部分:

主程序代码如下:

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.graphx._
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object RDD_println {
  6. def main(args: Array[String]) {
  7. //屏蔽日志
  8. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  9. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  10. //设置运行环境
  11. val conf = new SparkConf().setAppName("XXXAppName").setMaster("local")
  12. val sc = new SparkContext(conf)
  13. //设置顶点和边,注意顶点和边都是用元组定义的Array
  14. //顶点的数据类型
  15. val vertexArray = Array(
  16. (1L, ("Alice", 28)),(2L, ("Bob", 27)),
  17. (3L, ("Charlie", 65)),(4L, ("David", 42)),
  18. (5L, ("Ed", 55)),(6L, ("Fran", 50))
  19. )
  20. //边的数据类型
  21. val edgeArray = Array(
  22. Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),
  23. Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(5L, 2L, 2),
  24. Edge(5L, 3L, 8),Edge(5L, 6L, 3)
  25. )
  26. //构造vertexRDD和edgeRDD
  27. val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
  28. val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
  29. //构造图Graph[VD,ED]
  30. val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
  31. //----------------- Property Operators -----------------
  32. //----------------- Structural Operators -----------------
  33. //----------------- Computing Degree -----------------
  34. //----------------- Collecting Neighbors -----------------
  35. //----------------- Join Operators -----------------
  36. //----------------- mapReduceTriplets -----------------
  37. //----------------- aggregateMessages -----------------
  38. //----------------- Pregel API Functions -----------------
  39. }
  40. }

Property Operators

  1. graph.edges.foreach(println)
  2. println
  3. graph.vertices.foreach(println)
  4. println
  5. graph.triplets.foreach(println)
  6. println
  7. graph.triplets.foreach(e => prin(s"edge(${e.srcId},${e.dstId})\tage(${e.srcAttr._2},${e.dstAttr._2})"))
  8. //---- vertexRDD.foreach ----原始的vertexRDD保持顺序
  9. //1 name=Alice age=28
  10. //2 name=Bob age=27
  11. //3 name=Charlie age=65
  12. //4 name=David age=42
  13. //5 name=Ed age=55
  14. //6 name=Fran age=50
  15. //---- edgeRDD.foreach ----
  16. // 2 to 1 w=7
  17. //2 to 4 w=2
  18. //3 to 2 w=4
  19. //3 to 6 w=3
  20. //4 to 1 w=1
  21. //5 to 2 w=2
  22. //5 to 3 w=8
  23. //5 to 6 w=3
  24. //---- graph.vertices.foreach ----生成graph之后顺序被打乱
  25. //4 name=David age=42
  26. //1 name=Alice age=28
  27. //6 name=Fran age=50
  28. //3 name=Charlie age=65
  29. //5 name=Ed age=55
  30. //2 name=Bob age=27
  31. //---- graph.edges.foreach ----
  32. // 2 to 1 w=7
  33. //2 to 4 w=2
  34. //3 to 2 w=4
  35. //3 to 6 w=3
  36. //4 to 1 w=1
  37. //5 to 2 w=2
  38. //5 to 3 w=8
  39. //5 to 6 w=3
  40. take(n) 按与按顺序!取出前 n
  41. graph.edges.take(3).foreach(println)
  42. // mapVertices 对Vertices进行map操作
  43. // 对整个顶点集的 某一部分 进行批量操作
  44. graph.vertices.foreach( x => println(x))
  45. // 不改变顺序的进行map操作
  46. graph.mapVertices{
  47. case (id, (name, age)) => (id, (name, age+10))
  48. }.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  49. //collect可有可无
  50. //mapVertices{case()=> ()} 这种必须用{}
  51. //但是对于 mapVertices{ () => () } 这种也可以用 mapVertices( () => () )
  52. graph.mapEdges{
  53. e=>e.attr*2
  54. }.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

Structural Operators

  1. // 原始图结构
  2. graph.edges.foreach(println)
  3. println
  4. // 满足要求的子图结构 这里只用到了 参数epred
  5. val sub = graph.subgraph( epred = e => e.srcId > e.dstId )
  6. sub.edges.foreach(println)
  7. println("原图顶点数:"+graph.vertices.count()+"\t子图顶点数:"+sub.vertices.count())
  8. println("原图边数:"+graph.edges.count()+"\t子图边:"+sub.edges.count())
  9. //原图顶点数:6 子图顶点数:6
  10. //原图边数:8 子图边:5
  11. //分析:过滤掉了3条边,但是仍然包含全部顶点
  12. // 错误的写法: subgraph( epred = e => e.srcId > e.dstId, vpred = (id,(_,_)) => id > 4 )
  13. val sub2 = graph.subgraph( epred = e => e.srcId > e.dstId, vpred = (id,_) => id > 4 )
  14. println("原图顶点数:"+graph.vertices.count()+"\t子图顶点数:"+sub2.vertices.count())
  15. println("原图边数:"+graph.edges.count()+"\t子图边:"+sub2.edges.count())
  16. //原图顶点数:6 子图顶点数:2
  17. //原图边数:8 子图边:0
  18. //分析:同时有epred和vpred两个条件,最终只剩两点,且无边连接,即边数为0
  19. graph.edges.foreach(println)
  20. println
  21. graph.edges.reverse.foreach(println)
  22. //Edge(2,1,7)
  23. //Edge(2,4,2)
  24. //Edge(3,2,4)
  25. //Edge(3,6,3)
  26. //Edge(4,1,1)
  27. //Edge(5,2,2)
  28. //Edge(5,3,8)
  29. //Edge(5,6,3)
  30. // reverse之后,是边的入点出点 相互交换,即边进行翻转
  31. //Edge(1,2,7)
  32. //Edge(1,4,1)
  33. //Edge(2,3,4)
  34. //Edge(2,5,2)
  35. //Edge(3,5,8)
  36. //Edge(4,2,2)
  37. //Edge(6,3,3)
  38. //Edge(6,5,3)
  39. // 注意:graph.vertices 没有reverse这个方法,即不能对顶点进行翻转

Computing Degree

  1. //输出全部 ( 度数 , 该度的个数 )
  2. graph.outDegrees.foreach( x => println(x))
  3. graph.degrees.foreach( x => println(x))
  4. println
  5. // 为什么这样设计
  6. // 因为:跟的是graph.degrees,即格式为 (VertexId, Int),
  7. // 所以对其的处理的参数 max(参数a:参数b):(结果),均为此格式
  8. // 得到最大度的节点:首先需要一个比较两点度大小的函数 max()
  9. def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  10. if (a._2 > b._2) a else b
  11. }
  12. println(graph.degrees.reduce(max))//foreach( x => println(x))
  13. println(graph.inDegrees.reduce(max))//foreach( x => println(x))
  14. println(graph.outDegrees.reduce(max))//foreach( x => println(x))

Collecting Neighbors

  1. // GraphOps实现的,即是属于 graph 的方法
  2. graph.collectNeighborIds(EdgeDirection.Out).foreach( x => {
  3. print(x._1+" cnt=")
  4. x._2.foreach(print)
  5. println
  6. })
  7. // 按出度,找到的点的邻居节点
  8. //4 cnt=1
  9. //1 cnt=
  10. //6 cnt=
  11. //3 cnt=26
  12. //5 cnt=236
  13. //2 cnt=14
  14. println
  15. graph.collectNeighborIds(EdgeDirection.Either).foreach( x => {
  16. print(x._1+" cnt=")
  17. x._2.foreach(print)
  18. println
  19. })
  20. // 按出入度,不能用Both,要用Either
  21. //4 cnt=21
  22. //1 cnt=24
  23. //6 cnt=35
  24. //3 cnt=265
  25. //5 cnt=236
  26. //2 cnt=1435
  27. println
  28. graph.collectNeighbors(EdgeDirection.Either).foreach( x => {
  29. print(x._1+" cnt=")
  30. x._2.foreach(print) //collectNeighbors 找到的邻居是全属性,而不仅是ID
  31. println
  32. })
  33. //4 cnt=(2,(Bob,27))(1,(Alice,28))
  34. //1 cnt=(2,(Bob,27))(4,(David,42))
  35. //6 cnt=(3,(Charlie,65))(5,(Ed,55))
  36. //3 cnt=(2,(Bob,27))(6,(Fran,50))(5,(Ed,55))
  37. //5 cnt=(2,(Bob,27))(3,(Charlie,65))(6,(Fran,50))
  38. //2 cnt=(1,(Alice,28))(4,(David,42))(3,(Charlie,65))(5,(Ed,55))

Join Operators

  1. // 属性重置为0
  2. val rawGraph = graph.mapVertices((id,_) => 0)
  3. rawGraph.vertices.foreach(println)
  4. println
  5. rawGraph.joinVertices[Int](rawGraph.degrees)((_,_,outDeg) => outDeg).vertices.foreach(println)
  6. //(4,0)
  7. //(1,0)
  8. //(6,0)
  9. //(3,0)
  10. //(5,0)
  11. //(2,0)
  12. // joinVertices 按degree使用join操作 加入到 rawGraph 中
  13. //(4,2)
  14. //(1,2)
  15. //(6,2)
  16. //(3,3)
  17. //(5,3)
  18. //(2,4)
  19. rawGraph.outerJoinVertices(rawGraph.degrees)((_,_,outDeg) => outDeg).vertices.foreach(println)
  20. //(4,Some(2))
  21. //(1,Some(2))
  22. //(6,Some(2))
  23. //(3,Some(3))
  24. //(5,Some(3))
  25. //(2,Some(4)
  26. //进阶用法
  27. //顶点用 类对象来代替,利用到 outerJoinVertices
  28. val inDegrees: VertexRDD[Int] = graph.inDegrees
  29. case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
  30. //创建一个新图,顶点VD的数据类型为User,并从graph做类型转换
  31. // Graph[User, Int] 即 Graph[VD,ED],意思是顶点由User类代替,ED是Int类型
  32. val ugraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, }
  33. //initialUserGraph与inDegrees、outDegrees(RDD)进行连接,并修改initialUserGraph中inDeg值、outD值
  34. //outerJoinVertices 这个方法属于 Graph 类型的方法
  35. val userGraph = ugraph.outerJoinVertices(ugraph.inDegrees) {
  36. //required: (graphx.VertexId, User, Option[Int])
  37. case (id, u, in) => User(u.name, u.age, in.getOrElse(0), u.outDeg)
  38. }.outerJoinVertices(ugraph.outDegrees) {
  39. case (id, u, out) => User(u.name, u.age, u.inDeg,out.getOrElse(0))
  40. }
  41. println("连接图顶点的属性:")
  42. userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: v._2.outDeg}"))
  43. println("出度和入读相同的:")
  44. userGraph.vertices.filter {
  45. //此处应该格式为 (graphx.VertexId, User)
  46. case (id, u) => u.inDeg == u.outDeg
  47. }.collect.foreach {
  48. case (id, property) => println(property.name)
  49. }

mapReduceTriplets

  1. //在早的GraphX版本中我们计算邻居聚合使用mapReduceTriplets操作;
  2. //注意:当前版本 Graph已经不存在MapReduceTriplets这个方法
  3. // libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
  4. // libraryDependencies += "org.apache.spark" %% "spark-graphx" % "2.2.0"
  5. //
  6. //mapReduceTriplets
  7. // 操作应用用户定义的map函数到每一个triplet ,使用用户定义的reduce函数聚合产生 messages。。
  8. // 然而,我们发现用户返回迭代器是昂贵的,它抑制了我们应用额外优化(例如,本地顶点的重新编号)的能
  9. // 在 aggregateMessages 中我们引进了EdgeContext,其暴露triplet属性,也明确了函数发送信息的源和顶点//。
  10. // 除此之外,我们移除了字节码检测,取而代之的是要求用户指明哪个triplet属性被需要。
  11. val graph: Graph[Int, Float] = ...
  12. def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  13. Iterator((triplet.dstId, "Hi"))
  14. }
  15. def reduceFun(a: Int, b: Int): Int = a + b
  16. val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
  17. def msgFun(t: EdgeContext[(String,Int), Int, Int]) {
  18. t.sendToSrc(100)
  19. }
  20. def reduceFun(a: Int, b: Int): Int = a + b
  21. val result = graph.aggregateMessages[Int](msgFun, reduceFun)
  22. result.foreach(println)

aggregateMessages

  1. //对每个节点的邻接点的属性进行聚合统计
  2. //老版本使用 mapReduceTriplets
  3. val oldFlowers: VertexRDD[(Int,Double)] = graph.mapReduceTriplets[(Int,Double)]{
  4. triplet => {
  5. if (triplet.srcAttr > triplet.dstAttr) {
  6. Iterator((triplet.dstId, (1, triplet.srcAttr )))
  7. } else {
  8. Iterator.empty
  9. }
  10. },
  11. (a,b) => (a._1+b._1, a._2+b._2)
  12. }
  13. }
  14. // 现在使用 aggregateMessages 代替
  15. // 首先生成一个随机图
  16. val g: Graph[Double,Int] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( d,_) => id.toDouble )
  17. //g.edges.foreach(println)
  18. //println
  19. def msgFun(triplet: EdgeContext[Double, Int, (Int,Double)]) {
  20. if (triplet.srcAttr > triplet.dstAttr) {
  21. triplet.sendToDst((1, triplet.srcAttr ))
  22. //注意:原先的 Iterator((triplet.dstId, (1, triplet.srcAttr ))) 上述代码替换,作用完全一样
  23. //sendToDst 意思为向 目标点 发送消息
  24. //println("Iterator(("+triplet.dstId+", (1, "+triplet.srcAttr+" )))");
  25. }else{
  26. Iterator.empty
  27. }
  28. }
  29. // 有个问题:写成函数定义的形式 下面代码总是出错
  30. // def reduceFun(a:Int,b:Double): (Int,Double) = (a._1+b._1, a._2+b._2)
  31. // aggregateMessages[OOO] OOO处的类型,会附加到图g类型之后,
  32. // 即:g为 Double,Int --> result为 Double,Int,OOO
  33. val result = g.aggregateMessages[(Int,Double)](msgFun, (a,b) => (a._1+b._1,a._2+b._2))
  34. //result.collect.foreach(println)
  35. //上述代码执行完后,形如(45,2913.0),已经是reduce完成的状态,如果下面继续计算平均值,直接后项除项即可
  36. //(19,(45,2913.0))
  37. //(39,(45,2873.0))
  38. //(34,(32,2102.0))
  39. //(4,(62,3466.0))
  40. //(71,(12,1028.0))
  41. //result现在是(Int,Double)形式,现在对Double元素即value
  42. //这里的的match-case类似于switch-case
  43. //对于avg:VertexRDD[Double],后项变量类型可以省去不写
  44. val avg:VertexRDD[Double] = result.mapValues(
  45. (_,value) => //这句话意思是前项保持不动,对后项value即形如(12,1028.0)的部分进行加工
  46. value match {
  47. case (count,total) => total/count
  48. }
  49. ) //整体返回值avg类型为(Int,Double)
  50. //avg.collect.foreach(println)
  51. //下面不使用随机图,使用开头自定义的图结构时:
  52. graph.triplets.foreach(e => intln(s"edge(${e.srcId},${e.dstId})\tage(${e.srcAttr._2},${e.dstAttr._2})"))
  53. def msgFun(triplet: EdgeContext[(String,Int), Int, (Int,Double)]) {
  54. if (triplet.srcAttr._2 > triplet.dstAttr._2) {
  55. triplet.sendToDst((1, triplet.srcAttr._2 ))
  56. }else{
  57. Iterator.empty
  58. }
  59. }
  60. val result = graph.aggregateMessages[(Int,Double)](msgFun, (a,b) => (a._1+b._1,a._2+b._2))
  61. println
  62. result.collect.foreach(println)
  63. //分析:原始图结构
  64. //((2,(Bob,27)),(1,(Alice,28)),7)
  65. //((2,(Bob,27)),(4,(David,42)),2)
  66. //((3,(Charlie,65)),(2,(Bob,27)),4) 年龄条件符合:顶点2 邻居的Age为 65
  67. //((3,(Charlie,65)),(6,(Fran,50)),3) 年龄条件符合:顶点6 邻居的Age为 65
  68. //((4,(David,42)),(1,(Alice,28)),1) 年龄条件符合:顶点1 邻居的Age为 42 = 42 1个邻居
  69. //((5,(Ed,55)),(2,(Bob,27)),2) 年龄条件符合:顶点2 邻居的Age为 65+55 = 120 2个邻居
  70. //((5,(Ed,55)),(3,(Charlie,65)),8)
  71. //((5,(Ed,55)),(6,(Fran,50)),3) 年龄条件符合:顶点6 邻居的Age为 65+55 = 120 2个邻居
  72. //上述计算的意义是:找到每个顶点用户的比自身年龄大的邻居节点用户的年龄之和,以及邻居数;
  73. //所以,result为:
  74. //(1,(1,42.0))
  75. //(6,(2,120.0))
  76. //(2,(2,120.0))
  77. //继续求平均值
  78. val avg:VertexRDD[Double] = result.mapValues(
  79. (_,value) =>
  80. value match {
  81. case (count,total) => total/count
  82. }
  83. )
  84. println
  85. avg.collect.foreach(println)
  86. //上述计算的意义是:找到每个顶点用户的比自身年龄大的邻居节点用户的平均年龄,即原本的计算目的
  87. //结果为:
  88. // (1,42.0)
  89. // (6,60.0)
  90. // (2,60.0)

Pregel API Functions

  1. // ShotPath 没有被封装成方法,需要自己实现
  2. val sourceId: VertexId = 5L // 定义源点
  3. // 用一个新图initialGraph 来初始化 顶点5 同其他顶点间的距离:本身距离为0,其他距离为MAX
  4. val initialGraph = graph.mapVertices((id,_) => if (id == sourceId) 0.0 else uble.PositiveInfinity)
  5. //initialGraph.vertices.foreach(println)
  6. //(4,Infinity)
  7. //(1,Infinity)
  8. //(6,Infinity)
  9. //(3,Infinity)
  10. //(5,0.0)
  11. //(2,Infinity)
  12. //initialGraph.triplets.foreach(println)
  13. //((2,Infinity),(1,Infinity),7)
  14. //((2,Infinity),(4,Infinity),2)
  15. //((3,Infinity),(2,Infinity),4)
  16. //((3,Infinity),(6,Infinity),3)
  17. //((4,Infinity),(1,Infinity),1)
  18. //((5,0.0),(2,Infinity),2)
  19. //((5,0.0),(3,Infinity),8)
  20. //((5,0.0),(6,Infinity),3)
  21. //Unspecified value parameters: vprog:
  22. // 第一部分:(graphx.VertexId, Double, Double) => Double,
  23. // 第二部分:sendMsg: EdgeTriplet[Double, Int] => Iterator[(graphx.VertexId, Double)],
  24. // 第三部分:mergeMsg: (Double, Double) => Double
  25. val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  26. (id, dist, newDist) => {math.min(dist, newDist)},
  27. triplet => { // 计算权重
  28. if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
  29. println("Iterator(("+triplet.dstId+", "+triplet.srcAttr+" + "+triplet.attr+"))")
  30. //triplet.attr即边的权重,不断加入triplet.attr,最后就能找到最短路径
  31. Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
  32. } else {
  33. println("Iterator.empty")
  34. Iterator.empty
  35. }
  36. },
  37. (a,b) => math.min(a,b) // 更新点⑤到该点的距离
  38. )
  39. //解释下过程,拿Triplets结构来说
  40. //((2,Infinity),(1,Infinity),7)
  41. //((2,Infinity),(4,Infinity),2)
  42. //((3,Infinity),(2,Infinity),4)
  43. //((3,Infinity),(6,Infinity),3)
  44. //((4,Infinity),(1,Infinity),1)
  45. //((5,0.0),(2,Infinity),2)
  46. //((5,0.0),(3,Infinity),8)
  47. //((5,0.0),(6,Infinity),3)
  48. //首先由于前五行,顶点的attr均为Infinity,所以sendMsg中的Iterator均是empty空迭代
  49. //第六行,0+2<Infinity,满足条件,传递(0+2=2)到入点,即变为((5,0.0),(2,2),2),意为5到点2距2
  50. //第七行,0+8<Infinity,满足条件,传递(0+8=8)到入点,即变为((5,0.0),(3,8),8),意为5到点3距8
  51. //第八行,0+2<Infinity,满足条件,传递(0+2=2)到入点,即变为((5,0.0),(6,3),3),意为5到点6距3
  52. //==进行mergeMsg步骤== 注意现在更新为:
  53. //((2,2),(1,Infinity),7)
  54. //((2,2),(4,Infinity),2)
  55. //((3,8),(2,2),4)
  56. //((3,8),(6,3),3)
  57. //((4,Infinity),(1,Infinity),1)
  58. //((5,0.0),(2,2),2)
  59. //((5,0.0),(3,8),8)
  60. //((5,0.0),(6,3),3)
  61. //第一行,2+7<Infinity,满足条件,传递(2+7=9)到入点,即变为((2,2),(1,9),7),意为5到点1距离为2=9
  62. //第二行,2+2<Infinity,满足条件,传递(2+2=4)到入点,即变为((2,2),(4,4),2),意为5到点4距离为2=4
  63. //Iterator.empty 因为 8+4>2
  64. //Iterator.empty 因为 8+3>3
  65. //Iterator.empty 因为 Infinity无法比较
  66. //Iterator.empty 因为 0+2=2
  67. //Iterator.empty 因为 0+8=8
  68. //Iterator.empty 因为 0+3=3
  69. //==进行mergeMsg步骤== 注意现在更新为:
  70. //((2,2),(1,9),7)
  71. //((2,2),(4,4),2)
  72. //((3,8),(2,2),4)
  73. //((3,8),(6,3),3)
  74. //((4,4),(1,9),1)
  75. //((5,0.0),(2,2),2)
  76. //((5,0.0),(3,8),8)
  77. //((5,0.0),(6,3),3)
  78. //第一行:Iterator.empty 因为 2+7=9
  79. //第二行:Iterator.empty 因为 2+2=4
  80. //第三行:Iterator.empty 因为 8+4>2
  81. //第四行:Iterator.empty 因为 8+3>3
  82. //第五行:4+1<9,满足条件,传递(4+1=5)到入点,即变为((4,4),(1,5),1),意为5到点1距离由9更新为5
  83. //第六行:Iterator.empty 因为 0+2=2
  84. //第七行:Iterator.empty 因为 0+8=8
  85. //第八行:Iterator.empty 因为 0+3=3
  86. //==进行mergeMsg步骤== 注意现在更新为:
  87. //((2,2),(1,5),7)
  88. //((2,2),(4,4),2)
  89. //((3,8),(2,2),4)
  90. //((3,8),(6,3),3)
  91. //((4,4),(1,5),1)
  92. //((5,0.0),(2,2),2)
  93. //((5,0.0),(3,8),8)
  94. //((5,0.0),(6,3),3)
  95. //此时,每个顶点的格式即(顶点id,顶点5到该顶点的最短距离),即最短路径算法完成
  96. sssp.vertices.foreach(println)
  97. //最终输出的结果:
  98. //(4,4.0)
  99. //(1,5.0)
  100. //(6,3.0)
  101. //(3,8.0)
  102. //(5,0.0)
  103. //(2,2.0)
  1. // PageRank 有封装好的方法
  2. //注意PageRank的参数,其实可以用精度来理解,这个值越小rank计算越精确,但计算时间也越长
  3. graph.pageRank(0.01).vertices.foreach(println)
  4. //(4,0.9727164143364966)
  5. //(1,1.7757164399923602)
  6. //(6,1.0009207604397985)
  7. //(3,0.7024005336419639)
  8. //(5,0.5473250911495823)
  9. //(2,1.0009207604397985)
  10. graph.pageRank(0.05).vertices.foreach(println)
  11. //(4,0.9993165004824702)
  12. //(1,1.700587005467996)
  13. //(6,0.9890640077195239)
  14. //(3,0.7430041814088131)
  15. //(5,0.5789642972016725)
  16. //(2,0.9890640077195239)
  17. graph.pageRank(0.1).vertices.foreach(println)
  18. //(4,1.035409289731306)
  19. //(1,1.5453030618621122)
  20. //(6,1.0247865028119143)
  21. //(3,0.7698396167465112)
  22. //(5,0.5998750260362425)
  23. //(2,1.0247865028119143)
  24. graph.pageRank(0.5).vertices.foreach(println)
  25. //(4,0.9999999999999999)
  26. //(1,0.9999999999999999)
  27. //(6,0.9999999999999999)
  28. //(3,0.9999999999999999)
  29. //(5,0.9999999999999999)
  30. //(2,0.9999999999999999)
  1. // TrangleCount 有封装好的方法
  2. // 注意,老版本GraphX需要 srcID<dstID,有些教程还是这样说的,但是2.x已经没有这个要求
  3. graph.triangleCount().vertices.foreach(println)
  4. //输出结果:
  5. //(4,1) 意思为顶点4 外接一个三角形
  6. //(1,1) ..
  7. //(6,1) ..
  8. //(3,2) 意思为顶点2 外接两个三角形
  9. //(5,2) ..
  10. //(2,2) ..
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注