@yudesong
2018-02-12T03:19:54.000000Z
字数 4083
阅读 747
spark
➢ MapReduce局限性
• 仅支持Map和Reduce两种语义操作
• 处理效率低,耗费时间长
• 不适合处理迭代计算、交互式处理、实时流处理等
• 更多的应用于大规模批处理场景
➢ 计算处理框架种类多,选型复杂
• 批处理:MapReduce、Hive、Pig
• 流式计算:Storm
• 交互式计算:Impala、Presto
• 机器学习算法:Mahout
➢ 希望能够简化技术选型,在一个统一的框架下,能够完成批处理、流式计算、交互式计算、机器学习算法等
➢ 由加州大学伯克利分校的AMP实验室开源
➢ 大规模分布式通用计算引擎
➢ 具有高吞吐、低延时、通用易扩展、高容错等特点
➢ 使用Scala语言开发,提供了丰富的开发API,支持Scala、Java、Python、R开发语言
➢ Spark提供多种运行模式
➢ 计算高效
• 使用内存计算引擎,提供Cache缓存机制支持迭代计算或多次数据共享,减少数据读取的IO开销
• DAG引擎,减少多次计算之间中间结果写到HDFS的开销
• 使用多线程池模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作
➢ 通用易用
• 提供了丰富的开发API,支持Scala、Java、Python、R开发语言 • 集成批处理、流处理、交互式计算、机器学习算法、图计算
➢ 运行模式多样
• Local、Standalone、Yarn、Mesos
Resilient Distributed Datasets弹性分布式数据集
• Spark基于RDD进行计算
• 分布在集群中的只读对象集合(由多个Partition构成)
• 可以存储在磁盘或内存中
• 可以通过并行转换操作构造
• 失效后自动重构
➢ Transformation
• 将Scala集合或者Hadoop数据集构造一个新的RDD
• 通过已有的RDD产生新的RDD
• 只记录转换关系,不触发计算
• 如:map、filter等
➢ Action
• 通过RDD计算得到一个或者一组值
• 真正触发执行
• 如:count、collect、saveAsTextFile
rdd1.map(_+1).saveAsTextFile("hdfs://node01:9000/")
➢ 接口定义方式不同
• Transformation:RDD[X] -> RDD[Y]
• Action:RDD[X] -> Z
➢ 执行计算方式不同
• Transformation采用惰性执行方式,只记录RDD转化关系,不会触发真正计算执行
• Action真正触发计算执行
val rdd1 = sc.textFile("hdfs://192.168.183.101:9000/data/wc/in")val rdd2 = rdd2.flatMap(_.split("\t"))val rdd3= rdd3.map((_,1))val rdd4 = rdd3.reduceByKey(_ + _)rdd4.saveAsTextFile(“hdfs://192.168.183.100:9000/data/wc/out”)
➢ Narrow Dependency窄依赖
• 父RDD中的分区最多只能被一个子
RDD的一个分区使用
• 子RDD如果只有部分分区数据丢失
或者损坏只需要从对应的父RDD重 新计算恢复
➢ Shuffle Dependency宽依赖
• 子RDD分区依赖父RDD所有分区
• 子RDD如果部分分区或者全部分区 数据丢失或者损坏需要从所有父 RDD重新计算,相对窄依赖付出的 代价更高,尽量避免宽依赖的使用
# Word Countval textFile = sc.textFile("hdfs://...")val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)counts.saveAsTextFile("hdfs://...")# PIval count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>val x = math.randomval y = math.randomx*x + y*y < 1}.count()println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")val rdd = sc.parallelize(1 to 100,2)#默认分区大小为该程序所分配的资源CPU核数val rdd = sc.parallelize(1 to 100)print red.paratition.size
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._object SparkWordCount {def FILE_NAME:String = "word_count_results_";def main(args:Array[String]) {if (args.length < 1) {println("Usage:SparkWordCount FileName");System.exit(1);}val conf = new SparkConf().setAppName("Spark Exercise:Spark Version Word Count Program");val sc = new SparkContext(conf);val textFile = sc.textFile(args(0));val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)//print the results,for debug use.//println("Word Count program running results:");//wordCounts.collect().foreach(e => {//val (k,v) = e//println(k+"="+v)//});wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());println("Word Count program running results are successfully saved.");}}// 提交任务./spark-submit \--class com.ibm.spark.exercise.basic.SparkWordCount \--master spark://hadoop036166:7077 \--num-executors 3 \--driver-memory 6g --executor-memory 2g \--executor-cores 2 \/home/fams/sparkexercise.jar \hdfs://hadoop036166:9000/user/fams/*.txt
1. Spark 基础(上篇)
2.Spark 基础(下篇)
3.Spark运行原理
4.许鹏:从零开始学习,Apache Spark源码走读(一)
5.Hadoop经典案例Spark实现