[关闭]
@yudesong 2018-02-12T03:19:54.000000Z 字数 4083 阅读 747

Spark计算引擎

spark


Spark产生背景

➢ MapReduce局限性
• 仅支持Map和Reduce两种语义操作
• 处理效率低,耗费时间长
• 不适合处理迭代计算、交互式处理、实时流处理等
• 更多的应用于大规模批处理场景

➢ 计算处理框架种类多,选型复杂
• 批处理:MapReduce、Hive、Pig
• 流式计算:Storm
• 交互式计算:Impala、Presto
• 机器学习算法:Mahout

➢ 希望能够简化技术选型,在一个统一的框架下,能够完成批处理、流式计算、交互式计算、机器学习算法等

Spark简介

➢ 由加州大学伯克利分校的AMP实验室开源
➢ 大规模分布式通用计算引擎
➢ 具有高吞吐、低延时、通用易扩展、高容错等特点
➢ 使用Scala语言开发,提供了丰富的开发API,支持Scala、Java、Python、R开发语言
➢ Spark提供多种运行模式

Spark特点

➢ 计算高效
• 使用内存计算引擎,提供Cache缓存机制支持迭代计算或多次数据共享,减少数据读取的IO开销
• DAG引擎,减少多次计算之间中间结果写到HDFS的开销
• 使用多线程池模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作
➢ 通用易用
• 提供了丰富的开发API,支持Scala、Java、Python、R开发语言 • 集成批处理、流处理、交互式计算、机器学习算法、图计算
➢ 运行模式多样
• Local、Standalone、Yarn、Mesos

Spark核心概念RDD

Resilient Distributed Datasets弹性分布式数据集
• Spark基于RDD进行计算
• 分布在集群中的只读对象集合(由多个Partition构成)
• 可以存储在磁盘或内存中
• 可以通过并行转换操作构造
• 失效后自动重构

RDD操作

➢ Transformation
• 将Scala集合或者Hadoop数据集构造一个新的RDD
• 通过已有的RDD产生新的RDD
• 只记录转换关系,不触发计算
• 如:map、filter等
➢ Action
• 通过RDD计算得到一个或者一组值
• 真正触发执行
• 如:count、collect、saveAsTextFile

rdd1.map(_+1).saveAsTextFile("hdfs://node01:9000/")

RDD常用Transformation

Transformation与Action对比

➢ 接口定义方式不同
• Transformation:RDD[X] -> RDD[Y]
• Action:RDD[X] -> Z
➢ 执行计算方式不同
• Transformation采用惰性执行方式,只记录RDD转化关系,不会触发真正计算执行
• Action真正触发计算执行

  1. val rdd1 = sc.textFile("hdfs://192.168.183.101:9000/data/wc/in")
  2. val rdd2 = rdd2.flatMap(_.split("\t"))
  3. val rdd3= rdd3.map((_,1))
  4. val rdd4 = rdd3.reduceByKey(_ + _)
  5. rdd4.saveAsTextFile(“hdfs://192.168.183.100:9000/data/wc/out”)

RDD Dependency依赖

➢ Narrow Dependency窄依赖
• 父RDD中的分区最多只能被一个子
RDD的一个分区使用
• 子RDD如果只有部分分区数据丢失
或者损坏只需要从对应的父RDD重 新计算恢复
➢ Shuffle Dependency宽依赖
• 子RDD分区依赖父RDD所有分区
• 子RDD如果部分分区或者全部分区 数据丢失或者损坏需要从所有父 RDD重新计算,相对窄依赖付出的 代价更高,尽量避免宽依赖的使用

实例

  1. # Word Count
  2. val textFile = sc.textFile("hdfs://...")
  3. val counts = textFile.flatMap(line => line.split(" "))
  4. .map(word => (word, 1))
  5. .reduceByKey(_ + _)
  6. counts.saveAsTextFile("hdfs://...")
  7. # PI
  8. val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
  9. val x = math.random
  10. val y = math.random
  11. x*x + y*y < 1
  12. }.count()
  13. println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
  14. val rdd = sc.parallelize(1 to 100,2)
  15. #默认分区大小为该程序所分配的资源CPU核数
  16. val rdd = sc.parallelize(1 to 100)
  17. print red.paratition.size

案例

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. object SparkWordCount {
  5. def FILE_NAME:String = "word_count_results_";
  6. def main(args:Array[String]) {
  7. if (args.length < 1) {
  8. println("Usage:SparkWordCount FileName");
  9. System.exit(1);
  10. }
  11. val conf = new SparkConf().setAppName("Spark Exercise:
  12. Spark Version Word Count Program");
  13. val sc = new SparkContext(conf);
  14. val textFile = sc.textFile(args(0));
  15. val wordCounts = textFile.flatMap(line => line.split(" "))
  16. .map(word => (word, 1))
  17. .reduceByKey((a, b) => a + b)
  18. //print the results,for debug use.
  19. //println("Word Count program running results:");
  20. //wordCounts.collect().foreach(e => {
  21. //val (k,v) = e
  22. //println(k+"="+v)
  23. //});
  24. wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
  25. println("Word Count program running results are successfully saved.");
  26. }
  27. }
  28. // 提交任务
  29. ./spark-submit \
  30. --class com.ibm.spark.exercise.basic.SparkWordCount \
  31. --master spark://hadoop036166:7077 \
  32. --num-executors 3 \
  33. --driver-memory 6g --executor-memory 2g \
  34. --executor-cores 2 \
  35. /home/fams/sparkexercise.jar \
  36. hdfs://hadoop036166:9000/user/fams/*.txt

1. Spark 基础(上篇)
2.Spark 基础(下篇)
3.Spark运行原理
4.许鹏:从零开始学习,Apache Spark源码走读(一)
5.Hadoop经典案例Spark实现

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注