[关闭]
@peerslee 2017-04-14T13:26:29.000000Z 字数 4265 阅读 2064

Spark RDD 常用API解析

Spark


map

val rdd = sc.parallelize(1 to 10)
/*
map:
1. 使用函数f 处理rdd 中的所有元素,产生一个新的mapRdd
2. 不会改变partition 数量
 */
val mapRdd = rdd.map(_*2)

print (mapRdd.collect().toBuffer)

ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

reduce

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55

flatMap

val rdd = sc.textFile("file:/home/peerslee/data.txt")

// 进行元素(行)遍历,但是会返回多个元素(根据空格分割单词)
val flatMapRdd = rdd.flatMap(line => line.split("\\s"))

println(flatMapRdd.collect().toBuffer)

ArrayBuffer(hello, spark, rdd, fire, fight)

filter

val rdd = sc.parallelize(1 to 10)
/*
函数f 返回值为boolean值,将满足要求的元素合成一个新的rdd.
 */
val filterRdd = rdd.filter(_%2 == 0)
println(filterRdd.collect().toBuffer)

ArrayBuffer(2, 4, 6, 8, 10)

mapPartitions

    val rdd = sc.parallelize(1 to 10)
/*
1. 同map
2. 传入的是一个分区的所有iterator集合
 */
val mapPartitionsRdd = rdd.mapPartitions(iter => iter.filter(_%2 == 1))
println(mapPartitionsRdd.collect().toBuffer)

ArrayBuffer(1, 3, 5, 7, 9)

glom

val rdd = sc.parallelize(1 to 10)

val glomRdd = rdd.glom()
glomRdd.foreach(arr => {
  arr.foreach(print)
  println()
})

17/03/16 15:35:57 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16872
9
10
345

17/03/16 15:35:57 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 915 bytes result sent to driver

distinct

 val rdd = sc.parallelize(Array(1,1,1,2,3,3,4,4,4,4,5))
/*
1. 去重
2. 改变partition数量
 */
val distinctRdd = rdd.distinct(2)

println(distinctRdd.collect().toBuffer)

ArrayBuffer(4, 2, 1, 3, 5)

cartesian

val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
/*
求两个Rdd 之间的笛卡尔积,并返回
 */
val cartesianRdd = rdd1.cartesian(rdd2)
println(cartesianRdd.collect().toBuffer)

ArrayBuffer((a,A), (a,B), (a,C), (b,A), (b,B), (b,C), (c,A), (c,B), (c,C))

union

val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
/*
将两个rdd 合并成一个rdd
 */
val unionRdd = rdd1.union(rdd2)
println(unionRdd.collect().toBuffer)

ArrayBuffer(a, b, c, A, B, C)

mapValues

 val rdd = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
/*
对kv形式的rdd中的value进行操作,返回一个Rdd
 */
val mapValuesRdd = rdd.mapValues(num => num *2)
println(mapValuesRdd.collect().toBuffer)

ArrayBuffer((A,2), (B,4), (C,6))

subtract

val rdd1 = sc.parallelize(Array("A", "B", "C", "a", "b", "c"))
val rdd2 = sc.parallelize(Array("A", "B", "C", "D"))

/*
找到rdd1 中有,rdd2 没有的元素
 */
val subtractRdd1 = rdd1.subtract(rdd2)
println(subtractRdd1.collect().toBuffer)
// ArrayBuffer(a, b, c)
val subtractRdd2 = rdd2.subtract(rdd1)
println(subtractRdd2.collect().toBuffer)
// ArrayBuffer(D)

sample

 val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
/*
随机抽取元素,
百分比,随机种子
返回rdd
 */
val sampleRdd = rdd.sample(true, 0.6, 2)
println(sampleRdd.collect().toBuffer)
// ArrayBuffer(B, C, b)

takeSample

 val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
/*
同上,
可以指定个数,
返回array
 */
val takeSampleRdd = rdd.takeSample(true, 2, 1)
println(takeSampleRdd.toBuffer)
// ArrayBuffer(a, b)

groupBy

 val rdd = sc.parallelize(Array("A1", "B1", "C1", "A2", "B2", "A3"))
/*
根据,f产生的key进行分组
 */
val groupByRdd = rdd.groupBy(_.substring(0, 1))
println(groupByRdd.collect().toBuffer)

ArrayBuffer((A,CompactBuffer(A1, A2, A3)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C1)))

partitionBy

  1. 适用key-value对
  2. 对RDD重新分区
  3. 如果相同,返回本身

cogroup

val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
/*
1. 对于kv
2. 将一个两个Rdd 中相同key,整合为新的kv,返回新的rdd
 */
val cogroupRdd = rdd1.cogroup(rdd2)
println(cogroupRdd.collect().toBuffer)
// ArrayBuffer((a,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (b,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (c,(CompactBuffer(1),CompactBuffer(11))))

combineByKey

 val rdd = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 3), ('b',4), ('c', 5)))
/*
将每个分区的元素按照,key,合并
 */
val combineByKeyRdd = rdd.combineByKey((v : Int) => List(v),
  (c : List[Int], v : Int) => v::c,
  (c1 : List[Int], c2 : List[Int]) => c1:::c2)

println(combineByKeyRdd.collect().toBuffer)
// ArrayBuffer((a,List(1, 2)), (b,List(3, 4)), (c,List(5)))

reduceByKey

// 相同key,用f处理value
val rdd = sc.parallelize(Array(('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b',1), ('c', 1)))
val reduceByKeyRdd = rdd.reduceByKey((v1 : Int, v2 : Int) => v1+v2)

println(reduceByKeyRdd.collect().toBuffer)
// ArrayBuffer((a,3), (b,2), (c,1))

join

  val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
  val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))

//相同key,value做笛卡尔积
val joinRdd = rdd1.join(rdd2)
println(joinRdd.collect().toBuffer)

ArrayBuffer((a,(1,11)), (a,(1,22)), (a,(2,11)), (a,(2,22)), (b,(1,11)), (b,(1,22)), (b,(2,11)), (b,(2,22)), (c,(1,11)))

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注