@spiritnotes
2016-03-11T16:05:29.000000Z
字数 7007
阅读 3490
Spark
实现快速而通用的集群计算的平台
每个Spark应用由一个驱动程序(driver program)来发起集群上的各种并行操作。驱动器程序通过一个SparkContext对象来访问Spark。这个对象代表计算集群的一个连接。Shell启动时已经自动创建一个sc变量。驱动器程序一般要管理多个执行器(executor)节点。
Python中需要使用 bin/spark-submit my_script.py 执行,才会对spark相关组件进行加载
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
RDD其实就是分布式的元素集合,在Spark中,对数据的所有操作不外乎创建RDD、转换已有RDD以及调用RDD操作进行求值。
RDD支持两种类型操作
rdd.filter(lambda x: a.name in x) # 整个a对象被序列化并传递
rdd.filter(lambda x: name in x) # 传递name
get_max_min = x.aggregate((-inf,inf),
(lambda acc, value:(max(acc[0],value),min(acc[1],value))),
(lambda acc1,acc2:(max(acc1[0],acc2[0]),min(acc1[1],acc2[1]))))
x.takeOrdered(10,key=lambda x:-x)
x.fold(0, (lambda acc,x: x+acc))
默认的缓存级别,scale和java会将数据以序列化的形式缓存在JVM的堆空间中。在Python中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。
# rdd.persisit()/rdd.cache()
rdd.count()
rdd.collect().mkstring(',') ## 会导致两次rdd计算,除非前面添加
可以通过unpersist来取消缓存
为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pair RDD。PairRDD提供了并行操作各个键或者跨节点重新进行数据分组的操作接口。
Python中需要返回一个二元组组成的RDD。只能是元组,字典不是,底层操作应该是采用迭代器协议遍历的。
x = sc.parallelize([(x,y) for x in 'abcde' for y in range(6,10)])
针对单个pair RDD的操作
x = sc.parallelize([(1,2),(3,4),(3,6)])
x.reduceByKey(lambda x,y: x+y).collect() #[(1, 2), (3, 10)]
x.foldByKey(2, lambda acc,x: x+acc).collect() #[(1, 4), (3, 14)]
x.mapValues(lambda x:x+1).collect() #[(1, 3), (3, 5), (3, 7)]
# 求键的平均
x.groupByKey().mapValues(lambda x:(sum(x), len(x))).collect() #[(1, (2, 1)), (3, (10, 2))]
x.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0], x[1]+y[1])).collect()
x.combineByKey(
(lambda x: (x,1)),
(lambda acc, x:(acc[0]+x, acc[1]+1)),
(lambda acc,acc2:(acc[0]+acc2[0], acc[1]+acc2[1]))
).map(lambda x: (x[0],x[1][0]/x[1][1])).collectAsMap()
每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。大多数操作符都支持接受第二个参数,该参数用来指定分组结果或聚合结果的RDD的分区数
r.reduceByKey((lambda x,y: x+y), 10)
r.getNumPartitions() #查看当前分区数
r.coalesce(num)
r.repartition(num)
如果给定RDD只需要被扫描一次,完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
Spark没有给出显式控制每个键具体落在哪一个节点上的方法(原因是节点可能失败),但其确保同一组的键出现在同一个节点上。
例如Join操作,如果有个用户信息表,不变化,而产生新的交易记录需要对表进行join,如果不控制,则可能对两者都进行混洗后再连接,如果将信息表先Hash分区并持久化后,则不再需要对信息表进行混洗,只需要对交易数据进行混洗
user_data = user_data_old.partitionBy(num).persist()
python中未提供API
类似cogroup/groupwith/join/leftouterJoin/rightOuterJoin/groupByKey/reduceByKey/combineByKey/lookup
Spark知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器。例如join结果会自动按照哈希分区,后续reduceByKey操作则会变快。
转换操作不一定会保留分区方式,例如map是可以改变键的。Spark提供了mapValues/flatMapValues可以保持键值不变
会设置分区方式的操作:cogroup/groupwith/join/left../right../groupByKey/reduceByKey/combineByKey/partitionBy/sort
取决于父RDD的分区方式:mapValues/flatMapValues/filter
二元操作符:取决于父RDD的分区方式,默认情况下会采用hash,数目与并行度一致。如何一个父亲设置过分区方式,则以它为准,如果两者都设过,则以第一个为准
pages = [
['a',['b','c','d','e']],
['b',['a','c','e']],
['c',['a','d','e']],
['d',['a']],
['e',['c','d']]
]
links = sc.parallelize(pages)
ranks = links.mapValues(lambda x:1.0)
for i in range(20):
contributes = links.join(ranks).flatMap(lambda x:((i, x[1][1]/len(x[1][0])) for i in x[1][0])).reduceByKey(lambda x,y:x+y)
ranks = contributes.mapValues(lambda x:0.15+0.85*x)
ranks.collect()
两种类型的共享变量:
使用map或者filter传条件的时候,可以使用驱动器程序中定义的变量,但是集群中的每个任务都会得到这些变量的一份新的副本。更新这些副本的值不会影响到驱动器中的对应变量
比如说统计空行
blanklines = sc.accumulator(0)
def extr(line):
global blanklines
if (line == ''):
blanklines += 1
return line.split(' ')
result = file.flatMap(extr)
可以通过map和reduce进行聚合,但是这种操作更方便
Spark有时候任务会失败或者会有较慢机器,因此可以会多次计算累计器
Spark会自动将闭包中所有引用到的变量发送到工作节点上。方便但是低效:1)默认的任务发射机制是专门为小任务进行优化的;2)事实上在多个并行操作中使用同一个变量,则在第二次操作会再发送一次该变量,广播变量的类型为spark.broadcast.Broadcast[T],可以对该变量调用value属性来获取实际的对象的值,只会发送到节点一次。