[关闭]
@sasaki 2016-01-13T07:43:23.000000Z 字数 4826 阅读 3312

流式计算与Spark Streaming

BigData Spark


版本控制

  1. @Title 流式计算与Spark Streaming
  2. @Version v1.0
  3. @Timestamp 2016-01-08 1217
  4. @Author Nicholas
  5. @Mail redskirt@outlook.com

前言

Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。
Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

Spark Streaming的优势:

一、流式计算的语义

  1. At-most-once:每条记录最多只能被处理一次
  2. At-least-once:每条记录最少要被处理一次
  3. Exactly-once:每条记录有且仅被处理一次

二、技术选型

  1. Spark Streaming
  2. Storm
  3. Heron
  4. Samza

    多种技术都对Kafka技术极好,Kafka几乎满足消息队列所有需求。

三、Overview

QQ截图20160108123341.png-119.1kB
Spark Streaming引擎将数据流切分为batch,每一小块为一个RDD。Spark Streaming本质为Micro Batch(微量的批处理)。

核心概念:DStream(源源不断的消息队列)

QQ截图20160108124706.png-114.3kB

Spark Streaming入口:StreamingContext

数据源:

Spark Streaming操作:

  1. Transformation-无状态
    和Spark语义一致。
    reduceByKey, map, flatMap, filter, count, reduce, etc groupByKey, sortByKey, join, etc

  2. Transformation-带状态

    • updateStateByKey(很常用)
    • window操作

    QQ截图20160108131500.png-240.9kB

    window, countByWindow, reduceByWindow, countByValueAndWindow, reduceByKeyAndWindow

离散数据流(DStream)
离散数据流(DStream)作为Spark Streaming中的一个基本抽象,代表一个数据流,这个数据流即可以从外部获得,也可以通过对输入流的转换获得。
DStream是通过一级时间序列上的连续的RDD来表示的,每一个RDD都包含了特定时间间隔内的数据流。

DStream的操作:

应用场景:

构建一个Spark Streaming程序主要分为以下步骤:
1)创建StreamingContext对象

  1. val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))

2)创建InputStream

  1. val lines = ssc.socketTextStream(args(1), args(2).toInt)

SparkStreaming支持多种不同的数据源,包括socketTextStream、kafkaStream、flumeStream、fileStream、networkStream等。

3)操作DStream
4)启动Spark Streaming

时间和窗口

  1. 批处理间隔(batch duration)
    Spark Streaming中处理数据的单位是一批而非一条,而数据采集是逐条进行的。Spark Streaming系统会设置间隔使得数据汇总到一定的量后再一并操作,这个间隔为批处理间隔。
    批处理间隔决定了Spark Streaming提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。

  2. 滑动间隔(slide duration)和窗口间隔(window duration)
    默认情况下滑动间隔被设置为与批处理间隔相同,而窗口间隔可以设置为更大的时间窗口。
    滑动间隔和窗口间隔的设置必须是批处理的整数倍。

例1. 运行NetwordCount例子

Spark example目录中包含多种Streaming测试例子,取其中NetworkWordCount拷贝内容到自己的Scala项目中打成jar包。

  1. [root@master streaming]# pwd /usr/application/tmp/spark-1.3.0/dist/examples/src/main/scala/org/apache/spark/examples/streaming
  2. [root@master streaming]# cat NetworkWordCount.scala
  3. package org.apache.spark.examples.streaming
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. import org.apache.spark.storage.StorageLevel
  7. object NetworkWordCount {
  8. def main(args: Array[String]) {
  9. if (args.length < 3) {
  10. System.err.println("Usage: NetworkWordCount <hostname> <port> <seconds>")
  11. System.exit(1)
  12. }
  13. // StreamingExamples.setStreamingLogLevels()
  14. val sparkConf = new SparkConf().setAppName("NetworkWordCount")
  15. val ssc = new StreamingContext(sparkConf, Seconds(args(3).toInt))
  16. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
  17. val words = lines.flatMap(_.split(" "))
  18. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  19. wordCounts.print()
  20. ssc.start()
  21. ssc.awaitTermination()
  22. }
  23. }

提交Spark Streaming程序。
(spark-submit是提交到standalone集群的方式)

  1. [root@master /]# cd /opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/spark/bin
  2. [root@master hadoop]# export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/hadoop/etc/hadoop
  3. [root@master hadoop]# SPARK_JAR=/opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/jars/spark-assembly-1.2.0-cdh5.3.8-hadoop2.5.0-cdh5.3.8.jar
  4. [root@master bin]# ./bin/spark-submit \
  5. --class org.apache.spark.examples.streaming.NetworkWordCount \
  6. --master spark://192.168.53.230:7077 \
  7. --executor-memory 512m \
  8. --total-executor-cores 1 \
  9. /usr/application/tmp/NetworkWordCount.jar 192.168.53.230 9999 10
  10. # 此处提交时遇到如下问题,怀疑是
  11. [root@master spark-1.3.1-bin-hadoop2.6]# ./bin/spark-submit \
  12. > --class org.apache.spark.examples.streaming.NetworkWordCount \
  13. > --master spark://master:7077 \
  14. > --executor-memory 512m \
  15. > --total-executor-cores 1 \
  16. > /usr/application/tmp/NetworkWordCount.jar master 9999 10
  17. Exception in thread "main" java.lang.NoSuchMethodException: org.apache.spark.examples.streaming.NetworkWordCount.main([Ljava.lang.String;)
  18. at java.lang.Class.getMethod(Class.java:1670)
  19. at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:554)
  20. at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
  21. at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
  22. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
  23. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  24. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  25. [root@master ~]# nc -lk 9999
  26. # spark-submit提交方式
  27. ./bin/spark-submit \
  28. --class <main-class>
  29. --master <master-url> \
  30. --deploy-mode <deploy-mode> \
  31. # other options
  32. <application-jar> \
  33. [application-arguments]
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注