[关闭]
@spiritnotes 2016-03-11T16:05:29.000000Z 字数 7007 阅读 3334

Spark快速大数据分析

Spark


Spark快速大数据分析图书封面

第1章 Spark数据分析导论

1.1 Spark是什么

实现快速而通用的集群计算的平台

1.2 一个大一统的软件栈

Spark core
实现了Spark的基本功能,包括任务调度、内存管理、错误恢复、与存储系统交互,对弹性分布式数据集(resilient distribution dataset,RDD)的API定义。RDD表示分布在多个计算节点上可以并行操作的元素集合。是Spark的主要编程抽象。
Spark Sql
Spark操作结构化数据的程序包。支持多种数据源,如Hive表、Parquet以及JSON等。
Spark Streaming
对实时数据进行流式计算的组件。
MLlib
机器学习包,分类、回归、聚类、协同过滤
GraphX
操作图的程序库,可以进行并行的图计算
集群管理器
支持在各种集群管理器上工作

1.3 Spark的用户与用途

数据科学任务
分析数据,交互性Shell
数据处理应用
为开发用于集群并行执行的程序提供了一条捷径

1.4 Spark简史

第2章 Spark下载与入门

2.1 下载Spark

2.2 Spark中Python和Scala的Shell

2.3 Spark核心概念简介

每个Spark应用由一个驱动程序(driver program)来发起集群上的各种并行操作。驱动器程序通过一个SparkContext对象来访问Spark。这个对象代表计算集群的一个连接。Shell启动时已经自动创建一个sc变量。驱动器程序一般要管理多个执行器(executor)节点。

2.4 独立应用

Python中需要使用 bin/spark-submit my_script.py 执行,才会对spark相关组件进行加载

  1. from pyspark import SparkConf, SparkContext
  2. conf = SparkConf().setMaster("local").setAppName("My App")
  3. sc = SparkContext(conf= conf)

第3章 RDD编程

RDD其实就是分布式的元素集合,在Spark中,对数据的所有操作不外乎创建RDD、转换已有RDD以及调用RDD操作进行求值。

3.1 RDD基础

RDD支持两种类型操作

3.2 创建RDD

3.3 RDD操作

3.4 向Spark传递函数

  1. rdd.filter(lambda x: a.name in x) # 整个a对象被序列化并传递
  2. rdd.filter(lambda x: name in x) # 传递name

3.5 常见的转换操作和行动操作

  1. get_max_min = x.aggregate((-inf,inf),
  2. (lambda acc, value:(max(acc[0],value),min(acc[1],value))),
  3. (lambda acc1,acc2:(max(acc1[0],acc2[0]),min(acc1[1],acc2[1]))))
  4. x.takeOrdered(10,key=lambda x:-x)
  5. x.fold(0, (lambda acc,x: x+acc))

3.6 持久化(缓存)

默认的缓存级别,scale和java会将数据以序列化的形式缓存在JVM的堆空间中。在Python中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。

  1. # rdd.persisit()/rdd.cache()
  2. rdd.count()
  3. rdd.collect().mkstring(',') ## 会导致两次rdd计算,除非前面添加

可以通过unpersist来取消缓存

第4章 键值对操作

4.1 动机

为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为pair RDD。PairRDD提供了并行操作各个键或者跨节点重新进行数据分组的操作接口。

4.2 创建Pair RDD

Python中需要返回一个二元组组成的RDD。只能是元组,字典不是,底层操作应该是采用迭代器协议遍历的。

  1. x = sc.parallelize([(x,y) for x in 'abcde' for y in range(6,10)])

4.3 pair RDD的转换操作

针对单个pair RDD的操作

聚合操作

  1. x = sc.parallelize([(1,2),(3,4),(3,6)])
  2. x.reduceByKey(lambda x,y: x+y).collect() #[(1, 2), (3, 10)]
  3. x.foldByKey(2, lambda acc,x: x+acc).collect() #[(1, 4), (3, 14)]
  4. x.mapValues(lambda x:x+1).collect() #[(1, 3), (3, 5), (3, 7)]
  5. # 求键的平均
  6. x.groupByKey().mapValues(lambda x:(sum(x), len(x))).collect() #[(1, (2, 1)), (3, (10, 2))]
  7. x.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0], x[1]+y[1])).collect()
  8. x.combineByKey(
  9. (lambda x: (x,1)),
  10. (lambda acc, x:(acc[0]+x, acc[1]+1)),
  11. (lambda acc,acc2:(acc[0]+acc2[0], acc[1]+acc2[1]))
  12. ).map(lambda x: (x[0],x[1][0]/x[1][1])).collectAsMap()

数据分组

连接

排序

行动操作

其他

每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。大多数操作符都支持接受第二个参数,该参数用来指定分组结果或聚合结果的RDD的分区数

  1. r.reduceByKey((lambda x,y: x+y), 10)
  2. r.getNumPartitions() #查看当前分区数
  3. r.coalesce(num)
  4. r.repartition(num)

4.5 数据分区

如果给定RDD只需要被扫描一次,完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。

Spark没有给出显式控制每个键具体落在哪一个节点上的方法(原因是节点可能失败),但其确保同一组的键出现在同一个节点上。

例如Join操作,如果有个用户信息表,不变化,而产生新的交易记录需要对表进行join,如果不控制,则可能对两者都进行混洗后再连接,如果将信息表先Hash分区并持久化后,则不再需要对信息表进行混洗,只需要对交易数据进行混洗

  1. user_data = user_data_old.partitionBy(num).persist()

获取RDD分区的方式

python中未提供API

从分区中获益的操作

类似cogroup/groupwith/join/leftouterJoin/rightOuterJoin/groupByKey/reduceByKey/combineByKey/lookup

影响分区方式的操作

Spark知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器。例如join结果会自动按照哈希分区,后续reduceByKey操作则会变快。

转换操作不一定会保留分区方式,例如map是可以改变键的。Spark提供了mapValues/flatMapValues可以保持键值不变

会设置分区方式的操作:cogroup/groupwith/join/left../right../groupByKey/reduceByKey/combineByKey/partitionBy/sort
取决于父RDD的分区方式:mapValues/flatMapValues/filter
二元操作符:取决于父RDD的分区方式,默认情况下会采用hash,数目与并行度一致。如何一个父亲设置过分区方式,则以它为准,如果两者都设过,则以第一个为准

实例:pagerank

  1. pages = [
  2. ['a',['b','c','d','e']],
  3. ['b',['a','c','e']],
  4. ['c',['a','d','e']],
  5. ['d',['a']],
  6. ['e',['c','d']]
  7. ]
  8. links = sc.parallelize(pages)
  9. ranks = links.mapValues(lambda x:1.0)
  10. for i in range(20):
  11. contributes = links.join(ranks).flatMap(lambda x:((i, x[1][1]/len(x[1][0])) for i in x[1][0])).reduceByKey(lambda x,y:x+y)
  12. ranks = contributes.mapValues(lambda x:0.15+0.85*x)
  13. ranks.collect()

自定义分区方式

第5章

第6章 Spark编程进阶

6.1 简介

两种类型的共享变量:

6.2 累加器

使用map或者filter传条件的时候,可以使用驱动器程序中定义的变量,但是集群中的每个任务都会得到这些变量的一份新的副本。更新这些副本的值不会影响到驱动器中的对应变量

比如说统计空行

  1. blanklines = sc.accumulator(0)
  2. def extr(line):
  3. global blanklines
  4. if (line == ''):
  5. blanklines += 1
  6. return line.split(' ')
  7. result = file.flatMap(extr)

可以通过map和reduce进行聚合,但是这种操作更方便

Spark有时候任务会失败或者会有较慢机器,因此可以会多次计算累计器

6.3 广播变量

Spark会自动将闭包中所有引用到的变量发送到工作节点上。方便但是低效:1)默认的任务发射机制是专门为小任务进行优化的;2)事实上在多个并行操作中使用同一个变量,则在第二次操作会再发送一次该变量,广播变量的类型为spark.broadcast.Broadcast[T],可以对该变量调用value属性来获取实际的对象的值,只会发送到节点一次。

6.4 基于分区进行操作

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注