[关闭]
@hadoopMan 2017-03-04T14:25:53.000000Z 字数 2332 阅读 1047

RDD的操作

spark


想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开

1,RDD创建

1,方式一直接读取hdfs上文件

1,读取hdfs上文件生成RDD

  1. val rdd = sc.textFile("hdfs://spark.learn.com:8020/user/hadoop/spark/input/wc.input")

创建RDDBY读取hdfs文件.png-107.1kB

2,对RDD进行处理

  1. val kvRdd = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))
  2. kvRdd.collect

得到无序结果.png-3.8kB

3,对key进行排序

  1. val wordRdd = rdd.flatMap(line => line.split(" "))
  2. val kvRdd = wordRdd.map((_,1))
  3. val wordcountRdd = kvRdd.reduceByKey((_ + _))
  4. wordcountRdd.collect

得到无序结果.png-86.4kB

  1. wordcountRdd.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)

排序并取前三个数据集.png-3.4kB

2,方式二并行化数组:

1,并行数组生成RDD

  1. val disRDD=sc.parallelize(Array(23,67,12,90,8,67))

通过并行化数据集.png-75.2kB
take,top,first,rdd.takeOrdered

2,take

  1. disRDD.take(3)

take3.png-53.7kB

3,first

  1. disRDD.first

first.png-62.7kB

4,top:

  1. disRDD.top(3)

top.png-7kB

5,takeOrdered:

  1. disRDD.takeOrdered(3)

disRDD.takeOrdered.png-44.3kB

3,分组并取前几个较大值

先分组,然后对于每组中的数据进行排序(降序),取前KEY值

1,方法一:

1,数据内容
  1. 数据集
  2. aa 78
  3. bb 98
  4. aa 80
  5. cc 98
  6. aa 69
  7. cc 87
  8. bb 97
  9. cc 86
  10. aa 97
  11. bb 78
  12. bb 34
  13. cc 85
  14. bb 92
  15. cc 72
  16. bb 32
  17. bb 23
2,读取文件生成RDD并处理
  1. scala> val rdd = sc.textFile("/user/hadoop/spark/input/spark.input")
  2. scala> rdd.map(line => line.split(" ")).collect
  3. Array[String] Array(aa, 78)
  4. scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1))).collect
  5. (String, String) :(aa,78)
  6. scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).collect
  7. (String, Int) (aa,78)
  8. scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.collect
  9. (String, Iterable[Int]) (aa,CompactBuffer(78, 80, 69, 97))
  10. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  11. x => (x._1,x._2.toList)
  12. ).collect
  13. (String, List[Int]) : (aa,List(78, 80, 69, 97))
  14. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  15. x => (x._1,x._2.toList.sorted)
  16. ).collect
  17. (String, List[Int]): (aa,List(69, 78, 80, 97))
  18. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  19. x => (x._1,x._2.toList.sorted.reverse)
  20. ).collect
  21. (String, List[Int]): (aa,List(97, 80, 78, 69))
  22. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  23. x => (x._1,x._2.toList.sorted.reverse.take(3))
  24. ).collect
  25. res26: Array[(String, List[Int])] = Array((aa,List(97, 80, 78)), (bb,List(98, 97, 92)), (cc,List(98, 87, 86)))
3,结果:

结果.png-6.8kB

2,方法二:

1,读取文件生成RDD并处理
  1. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  2. x => {
  3. val xx = x._1
  4. val yy = x._2
  5. val yx = yy.toList.sorted.takeRight(3).reverse
  6. (xx, yx)
  7. }
  8. ).collect
2,结果

结果.png-20.5kB

4,实现二次排序

  1. rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
  2. x => {
  3. val xx = x._1
  4. val yy = x._2
  5. val yx = yy.toList.sorted.takeRight(3).reverse
  6. (xx, yx)
  7. }
  8. ).sortByKey(true).collect

二次排序.png-15.5kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注