@hadoopMan
2017-03-04T14:25:53.000000Z
字数 2332
阅读 1047
spark
想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开
val rdd = sc.textFile("hdfs://spark.learn.com:8020/user/hadoop/spark/input/wc.input")
val kvRdd = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => (a + b))
kvRdd.collect
val wordRdd = rdd.flatMap(line => line.split(" "))
val kvRdd = wordRdd.map((_,1))
val wordcountRdd = kvRdd.reduceByKey((_ + _))
wordcountRdd.collect
wordcountRdd.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)
val disRDD=sc.parallelize(Array(23,67,12,90,8,67))
take,top,first,rdd.takeOrdered
disRDD.take(3)
disRDD.first
disRDD.top(3)
disRDD.takeOrdered(3)
先分组,然后对于每组中的数据进行排序(降序),取前KEY值
数据集
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
scala> val rdd = sc.textFile("/user/hadoop/spark/input/spark.input")
scala> rdd.map(line => line.split(" ")).collect
Array[String] : Array(aa, 78)
scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1))).collect
(String, String) :(aa,78)
scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).collect
(String, Int) : (aa,78)
scala> rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.collect
(String, Iterable[Int]) : (aa,CompactBuffer(78, 80, 69, 97))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList)
).collect
(String, List[Int]) : (aa,List(78, 80, 69, 97))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted)
).collect
(String, List[Int]): (aa,List(69, 78, 80, 97))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted.reverse)
).collect
(String, List[Int]): (aa,List(97, 80, 78, 69))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => (x._1,x._2.toList.sorted.reverse.take(3))
).collect
res26: Array[(String, List[Int])] = Array((aa,List(97, 80, 78)), (bb,List(98, 97, 92)), (cc,List(98, 87, 86)))
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
val yx = yy.toList.sorted.takeRight(3).reverse
(xx, yx)
}
).collect
rdd.map(line => line.split(" ")).map(x => (x(0),x(1).toInt)).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
val yx = yy.toList.sorted.takeRight(3).reverse
(xx, yx)
}
).sortByKey(true).collect