@peerslee
2017-04-14T13:26:29.000000Z
字数 4265
阅读 2064
Spark
val rdd = sc.parallelize(1 to 10)
/*
map:
1. 使用函数f 处理rdd 中的所有元素,产生一个新的mapRdd
2. 不会改变partition 数量
*/
val mapRdd = rdd.map(_*2)
print (mapRdd.collect().toBuffer)
ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55
val rdd = sc.textFile("file:/home/peerslee/data.txt")
// 进行元素(行)遍历,但是会返回多个元素(根据空格分割单词)
val flatMapRdd = rdd.flatMap(line => line.split("\\s"))
println(flatMapRdd.collect().toBuffer)
ArrayBuffer(hello, spark, rdd, fire, fight)
val rdd = sc.parallelize(1 to 10)
/*
函数f 返回值为boolean值,将满足要求的元素合成一个新的rdd.
*/
val filterRdd = rdd.filter(_%2 == 0)
println(filterRdd.collect().toBuffer)
ArrayBuffer(2, 4, 6, 8, 10)
val rdd = sc.parallelize(1 to 10)
/*
1. 同map
2. 传入的是一个分区的所有iterator集合
*/
val mapPartitionsRdd = rdd.mapPartitions(iter => iter.filter(_%2 == 1))
println(mapPartitionsRdd.collect().toBuffer)
ArrayBuffer(1, 3, 5, 7, 9)
val rdd = sc.parallelize(1 to 10)
val glomRdd = rdd.glom()
glomRdd.foreach(arr => {
arr.foreach(print)
println()
})
17/03/16 15:35:57 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16872
9
10
345
17/03/16 15:35:57 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 915 bytes result sent to driver
val rdd = sc.parallelize(Array(1,1,1,2,3,3,4,4,4,4,5))
/*
1. 去重
2. 改变partition数量
*/
val distinctRdd = rdd.distinct(2)
println(distinctRdd.collect().toBuffer)
ArrayBuffer(4, 2, 1, 3, 5)
val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
/*
求两个Rdd 之间的笛卡尔积,并返回
*/
val cartesianRdd = rdd1.cartesian(rdd2)
println(cartesianRdd.collect().toBuffer)
ArrayBuffer((a,A), (a,B), (a,C), (b,A), (b,B), (b,C), (c,A), (c,B), (c,C))
val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
/*
将两个rdd 合并成一个rdd
*/
val unionRdd = rdd1.union(rdd2)
println(unionRdd.collect().toBuffer)
ArrayBuffer(a, b, c, A, B, C)
val rdd = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
/*
对kv形式的rdd中的value进行操作,返回一个Rdd
*/
val mapValuesRdd = rdd.mapValues(num => num *2)
println(mapValuesRdd.collect().toBuffer)
ArrayBuffer((A,2), (B,4), (C,6))
val rdd1 = sc.parallelize(Array("A", "B", "C", "a", "b", "c"))
val rdd2 = sc.parallelize(Array("A", "B", "C", "D"))
/*
找到rdd1 中有,rdd2 没有的元素
*/
val subtractRdd1 = rdd1.subtract(rdd2)
println(subtractRdd1.collect().toBuffer)
// ArrayBuffer(a, b, c)
val subtractRdd2 = rdd2.subtract(rdd1)
println(subtractRdd2.collect().toBuffer)
// ArrayBuffer(D)
val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
/*
随机抽取元素,
百分比,随机种子
返回rdd
*/
val sampleRdd = rdd.sample(true, 0.6, 2)
println(sampleRdd.collect().toBuffer)
// ArrayBuffer(B, C, b)
val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
/*
同上,
可以指定个数,
返回array
*/
val takeSampleRdd = rdd.takeSample(true, 2, 1)
println(takeSampleRdd.toBuffer)
// ArrayBuffer(a, b)
val rdd = sc.parallelize(Array("A1", "B1", "C1", "A2", "B2", "A3"))
/*
根据,f产生的key进行分组
*/
val groupByRdd = rdd.groupBy(_.substring(0, 1))
println(groupByRdd.collect().toBuffer)
ArrayBuffer((A,CompactBuffer(A1, A2, A3)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C1)))
val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
/*
1. 对于kv
2. 将一个两个Rdd 中相同key,整合为新的kv,返回新的rdd
*/
val cogroupRdd = rdd1.cogroup(rdd2)
println(cogroupRdd.collect().toBuffer)
// ArrayBuffer((a,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (b,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (c,(CompactBuffer(1),CompactBuffer(11))))
val rdd = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 3), ('b',4), ('c', 5)))
/*
将每个分区的元素按照,key,合并
*/
val combineByKeyRdd = rdd.combineByKey((v : Int) => List(v),
(c : List[Int], v : Int) => v::c,
(c1 : List[Int], c2 : List[Int]) => c1:::c2)
println(combineByKeyRdd.collect().toBuffer)
// ArrayBuffer((a,List(1, 2)), (b,List(3, 4)), (c,List(5)))
// 相同key,用f处理value
val rdd = sc.parallelize(Array(('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b',1), ('c', 1)))
val reduceByKeyRdd = rdd.reduceByKey((v1 : Int, v2 : Int) => v1+v2)
println(reduceByKeyRdd.collect().toBuffer)
// ArrayBuffer((a,3), (b,2), (c,1))
val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
//相同key,value做笛卡尔积
val joinRdd = rdd1.join(rdd2)
println(joinRdd.collect().toBuffer)
ArrayBuffer((a,(1,11)), (a,(1,22)), (a,(2,11)), (a,(2,22)), (b,(1,11)), (b,(1,22)), (b,(2,11)), (b,(2,22)), (c,(1,11)))