[关闭]
@Arslan6and6 2016-10-10T06:53:15.000000Z 字数 15796 阅读 1120

spark streaming

spark


SparkStreaming 常见用途

-1,实时统计,累加
    kafka + sparkstreaming(updateStatByKey)
-2,实时统计,最近一段时间指标
    实时查看最近一个小时之内的用户点击量,各省或者重点城市
    window

官方案例
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html

  1. [root@hadoop-senior softwares]# rpm -ivh nc-1.84-22.el6.x86_64.rpm
  2. Preparing... ########################################### [100%]
  3. 1:nc ########################################### [100%]
  4. [root@hadoop-senior softwares]# which nc
  5. /usr/bin/nc
  6. [beifeng@hadoop-senior softwares]$ nc -lk 9999
  7. /bin/run-example streaming.NetworkWordCount hadoop-senior.ibeifeng.com 9999

官方模板

  1. [beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[3] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.StreamingContext._
  5. val ssc = new StreamingContext(sc, Seconds(5))
  6. // Create a DStream that will connect to hostname:port, like localhost:9999
  7. val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
  8. // Split each line into words
  9. val words = lines.flatMap(_.split(" "))
  10. import org.apache.spark.streaming.StreamingContext._
  11. // Count each word in each batch
  12. val pairs = words.map(word => (word, 1))
  13. val wordCounts = pairs.reduceByKey(_ + _)
  14. // Print the first ten elements of each RDD generated in this DStream to the console
  15. wordCounts.print()
  16. ssc.start() // Start the computation
  17. ssc.awaitTermination() // Wait for the computation to terminate
  18. sc.stop()

监视 "hadoop-senior.ibeifeng.com", 9999 ,当该端口有数据写入时,print()

  1. scala> import org.apache.spark._
  2. import org.apache.spark._
  3. scala> import org.apache.spark.streaming._
  4. import org.apache.spark.streaming._
  5. scala> import org.apache.spark.streaming.StreamingContext._
  6. import org.apache.spark.streaming.StreamingContext._
  7. scala> val ssc = new StreamingContext(sc, Seconds(5))
  8. ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@31d1e747
  9. // Step 1:Recevier Data From Where
  10. scala> val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
  11. lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@33d32a0d
  12. // Step 2: Process Data Base DStream
  13. scala> val words = lines.flatMap(_.split(" "))
  14. words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@3c9df3cb
  15. scala> val pairs = words.map(word => (word, 1))
  16. pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@336283a
  17. scala> val wordCounts = pairs.reduceByKey(_ + _)
  18. wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@55c528c5
  19. // Step 3: Output Result
  20. wordCounts.print()
  21. scala> ssc.start() // Start the computation
  22. scala> ssc.awaitTermination()
  23. [beifeng@hadoop-senior ~]$ nc -lk 9999
  24. spark context streaming spark
  25. -------------------------------------------
  26. Time: 1473141620000 ms
  27. -------------------------------------------
  28. (streaming,1)
  29. (spark,2)
  30. (context,1)

-1,Recevier
block interval 接收收据分块储存的时间 默认值200ms
接收的数据,分成block,进行存储在Executors中

-2,StreamingContext
batch interval 处理数据的时间间隔,每次处理多长时间范围内的数据
ssc = new StreamingContext(sc, Second(1))
1秒为t-0至t-1000,该时间内的block(blk-01 至 blk-05)合并成一个RDD,然后进行处理 Process
此RDD中的5个block对应5个task,从而进行并行处理
t-0
t-200 [blk-01
t-400 blk-02
t-600 blk-03
t-800 blk-04
t-1000 blk-05]
t-1200 blk-06
t-1400 blk-07

http://spark.apache.org/docs/1.6.1/configuration.html
spark.streaming.blockInterval 默认200ms 最小50ms
Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms.

spark.streaming.ui.retainedBatches 默认在4040界面最多显示1000个job

-3.测试常用输出方法
print()
saveAsTextFiles(prefix, [suffix])

foreachRDD(func) 把众多RDD写入外部系统,或是通过网络写入数据库
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

/user/beifeng/sparkstreaming/hdfsfiles
监视该目录 ,当该目录有新数据写入时,print()
val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999) 替换成
val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.StreamingContext._
  4. val ssc = new StreamingContext(sc, Seconds(5))
  5. // Create a DStream that will connect to hostname:port, like localhost:9999
  6. val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")
  7. // Split each line into words
  8. val words = lines.flatMap(_.split(" "))
  9. import org.apache.spark.streaming.StreamingContext._
  10. // Count each word in each batch
  11. val pairs = words.map(word => (word, 1))
  12. val wordCounts = pairs.reduceByKey(_ + _)
  13. // Print the first ten elements of each RDD generated in this DStream to the console
  14. wordCounts.print()
  15. ssc.start() // Start the computation
  16. ssc.awaitTermination() // Wait for the computation to terminate
  17. sc.stop()

开发方式
1. spark shell code 开发、测试
2. spark shell load script
将以上代码写入.scala脚本文件,在 spark-shell 加载脚本方式执行(load script)
scala> :load 脚本绝对路径

  1. [beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ vi WordCount.scala

3.IDE工具 开发、测试、打包JAR (生产环境),spark-submit提交应用程序
IDEA POM.XML添加依赖

  1. <!-- Spark STREAMING -->
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-streaming_2.10</artifactId>
  5. <version>${spark.version}</version>
  6. <scope>compile</scope>
  7. </dependency>

开发代码:

  1. package com.ibeifeng.bigdata.spark.app.streaming
  2. import org.apache.spark.{SparkContext, SparkConf}
  3. import org.apache.spark.streaming._
  4. object SocketWordCount {
  5. def main(args: Array[String]) {
  6. // step 0: SparkContext
  7. val sparkConf = new SparkConf()
  8. .setAppName("SocketWordCount Applicaiton") // name
  9. .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
  10. // Create SparkContext
  11. val sc = new SparkContext(sparkConf)
  12. val ssc = new StreamingContext(sc, Seconds(10))
  13. // Step 1:Recevier Data From Where
  14. val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
  15. // Step 2: Process Data Base DStream
  16. // Split each line into words
  17. val words = socketDStream.flatMap(_.split(" "))
  18. // Count each word in each batch
  19. val pairs = words.map(word => (word, 1))
  20. val wordCounts = pairs.reduceByKey(_ + _)
  21. // Step 3: Output Result
  22. // Print the first ten elements of each RDD generated in this DStream to the console
  23. wordCounts.print()
  24. ssc.start() // Start the computation
  25. ssc.awaitTermination() // Wait for the computation to terminate
  26. sc.stop
  27. }
  28. }

nc -lk 9999 输入数据后,IEDA控制台实时显示输入数据

Spark Steaming 提供2种数据源:

http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#input-dstreams-and-receivers
Spark Streaming provides two categories of built-in streaming sources.
* Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, and Akka actors.
* Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.
Advanced sources中,应注意与spark版本匹配,对于flume,应使用CDH版
* Kafka: Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1. See the Kafka Integration Guide for more details.
* Flume: Spark Streaming 1.6.1 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.

以上操作均为实时显示单次输入数据,并未实现实时累加功能

-1,实时统计,累加

实时累加功能实现:
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#transformations-on-dstreams
* updateStateByKey(func)
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

PairDStreamFunctions类中有如下方法
updateStateByKey() 传统方法 6种重载
mapWithState() spark1.6出现

updateStateByKey()之一。
并且注意 checkpoint("检查点路径") 即状态存储路径,保证数据HA
如运行程序没有结果显示,可能是由于前面运行程序仍然占用同一端口,关闭所有提交进程,重新运行程序。

  1. /**
  2. * Return a new "state" DStream where the state for each key is updated by applying
  3. * the given function on the previous state of the key and the new values of each key.
  4. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
  5. * @param updateFunc State update function. If `this` function returns None, then
  6. * corresponding state key-value pair will be eliminated.
  7. * @tparam S State type
  8. */
  9. def updateStateByKey[S: ClassTag](
  10. updateFunc: (Seq[V], Option[S]) => Option[S]
  11. ): DStream[(K, S)] = ssc.withScope {
  12. updateStateByKey(updateFunc, defaultPartitioner())
  13. }

方法声明解释:高阶函数 updateStateByKey 传入函数参数 updateFunc ,其中参数 Seq[V] 表示 当期统计结果,Option[S]表示往期统计状态(结果),返回值Option[S]表示累计统计结果

  1. package com.ibeifeng.bigdata.spark.app.streaming
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5. * Created by beifeng on 9/6/16.
  6. */
  7. object UpdataStateWordCount {
  8. def main(args: Array[String]) {
  9. // step 0: SparkContext
  10. val sparkConf = new SparkConf()
  11. .setAppName("SocketWordCount Applicaiton") // name
  12. .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
  13. // Create SparkContext
  14. val sc = new SparkContext(sparkConf)
  15. val ssc = new StreamingContext(sc, Seconds(10))
  16. ssc.checkpoint("sparkstreaming/socketwc3")
  17. // Step 1:Recevier Data From Where
  18. val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
  19. // Step 2: Process Data Base DStream
  20. // Split each line into words
  21. val words = socketDStream.flatMap(_.split(" "))
  22. // Count each word in each batch
  23. val pairs = words.map(word => (word, 1))
  24. val wordCounts = pairs.updateStateByKey(
  25. (values:Seq[Int], state:Option[Int])=>{
  26. val currentCount = values.sum
  27. val previousCount = state.getOrElse(0)
  28. //updata state and return
  29. Some(currentCount + previousCount)
  30. }
  31. )
  32. // Step 3: Output Result
  33. // Print the first ten elements of each RDD generated in this DStream to the console
  34. wordCounts.print()
  35. ssc.start() // Start the computation
  36. ssc.awaitTermination() // Wait for the computation to terminate
  37. sc.stop
  38. }
  39. }
  1. /**
  2. * :: Experimental ::
  3. * Return a [[MapWithStateDStream]] by applying a function to every key-value element of
  4. * `this` stream, while maintaining some state data for each unique key. The mapping function
  5. * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
  6. * transformation can be specified using [[StateSpec]] class. The state data is accessible in
  7. * as a parameter of type [[State]] in the mapping function.
  8. *
  9. * Example of using `mapWithState`:
  10. * {{{
  11. * // A mapping function that maintains an integer state and return a String
  12. * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
  13. * // Use state.exists(), state.get(), state.update() and state.remove()
  14. * // to manage state, and return the necessary string
  15. * }
  16. *
  17. * val spec = StateSpec.function(mappingFunction).numPartitions(10)
  18. *
  19. * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
  20. * }}}
  21. *
  22. * @param spec Specification of this transformation
  23. * @tparam StateType Class type of the state data
  24. * @tparam MappedType Class type of the mapped data
  25. */
  26. @Experimental
  27. def mapWithState[StateType: ClassTag, MappedType: ClassTag](
  28. spec: StateSpec[K, V, StateType, MappedType]
  29. ): MapWithStateDStream[K, V, StateType, MappedType] = {
  30. new MapWithStateDStreamImpl[K, V, StateType, MappedType](
  31. self,
  32. spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
  33. )
  34. }

mapWithState参数解析:
1.mapWithState需要一个 StateSpec 对象实例。 spec: StateSpec 表示一个转换规范
2.而在 StateSpec 其伴生类对象中含有4个名为 function 的方法 。我们选取其中一个所需参数与 mapWithState 对应的方法来创建 StateSpec 对象实例。

  1. /**
  2. * Create a [[org.apache.spark.streaming.StateSpec StateSpec]] for setting all the specifications
  3. * of the `mapWithState` operation on a
  4. * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]].
  5. *
  6. * @param mappingFunction The function applied on every data item to manage the associated state
  7. * and generate the mapped data
  8. * @tparam ValueType Class of the values
  9. * @tparam StateType Class of the states data
  10. * @tparam MappedType Class of the mapped data
  11. */
  12. def function[KeyType, ValueType, StateType, MappedType](
  13. mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
  14. ): StateSpec[KeyType, ValueType, StateType, MappedType] = {
  15. ClosureCleaner.clean(mappingFunction, checkSerializable = true)
  16. val wrappedFunction =
  17. (time: Time, key: KeyType, value: Option[ValueType], state: State[StateType]) => {
  18. Some(mappingFunction(key, value, state))
  19. }
  20. new StateSpecImpl(wrappedFunction)
  21. }

3.现在需要给 function 方法传递一个 函数参数 mappingFunction,该函数需要自己定义
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala 官方示例代码

  1. // Update the cumulative count using mapWithState
  2. // This will give a DStream made of state (which is the cumulative count of the words)
  3. val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  4. val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  5. val output = (word, sum)
  6. state.update(sum)
  7. output
  8. }

自定义 mappingFunction 方法

  1. def mappingFunction(key: String, value: Option[Int],
  2. state: State[Int]): (String, Int) = {
  3. // Use state.exists(), state.get(), state.update() and state.remove()
  4. var previousCount = 0
  5. if(state.exists()){
  6. previousCount = state.get()
  7. }
  8. // 获取现在传递的值
  9. val currentCount = value.getOrElse(0)
  10. // 更新状态
  11. state.update(previousCount + currentCount)
  12. (key, previousCount + currentCount)
  13. }

完成代码:

  1. package com.ibeifeng.bigdata.spark.app.streaming
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5. * Created by XuanYu on 2016/8/7.
  6. */
  7. object MapStateWordCount {
  8. def main(args: Array[String]) {
  9. // step 0: SparkContext
  10. val sparkConf = new SparkConf()
  11. .setAppName("LogAnalyzer Applicaiton") // name
  12. .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
  13. // Create SparkContext
  14. val sc = new SparkContext(sparkConf)
  15. val ssc = new StreamingContext(sc, Seconds(5))
  16. ssc.checkpoint("sparkstreaming/socketwc2/")
  17. // Step 1: Recevier Data From Where
  18. val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
  19. // Step 2: Process Data Base DStream
  20. // Split each line into words
  21. val words = socketDStream.flatMap(_.split(" "))
  22. // Count each word in each batch
  23. val pairs = words.map(word => (word, 1))
  24. /**
  25. * spec: StateSpec[K, V, StateType, MappedType]
  26. */
  27. /**
  28. * mappingFunction:
  29. * (KeyType, Option[ValueType], State[StateType]) => MappedType
  30. */
  31. // A mapping function that maintains an integer state and return a String
  32. def mappingFunction(key: String, value: Option[Int],
  33. state: State[Int]): (String, Int) = {
  34. /**
  35. * val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  36. * val output = (word, sum)
  37. * state.update(sum)
  38. * output
  39. */
  40. // Use state.exists(), state.get(), state.update() and state.remove()
  41. var previousCount = 0
  42. if(state.exists()){
  43. previousCount = state.get()
  44. }
  45. // 获取现在传递的值
  46. val currentCount = value.getOrElse(0)
  47. // 更新状态
  48. state.update(previousCount + currentCount)
  49. // to manage state, and return the necessary string
  50. // 返回的是你更新的KEY
  51. (key, previousCount + currentCount)
  52. }
  53. val spec = StateSpec.function(mappingFunction _)
  54. val wordCounts = pairs.mapWithState(spec)
  55. // Step 3: Output Result
  56. // Print the first ten elements of each RDD generated in this DStream to the console
  57. wordCounts.print()
  58. ssc.start() // Start the computation
  59. ssc.awaitTermination() // Wait for the computation to terminate
  60. sc.stop
  61. }
  62. }

-2,实时统计,最近一段时间指标

使用 class PairDStreamFunctions 里的 reduceByKeyAndWindow 方法。以下为该方法重载之一

  1. /**
  2. * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
  3. * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
  4. * generate the RDDs with Spark's default number of partitions.
  5. * @param reduceFunc associative reduce function
  6. * @param windowDuration width of the window; must be a multiple of this DStream's
  7. * batching interval
  8. * @param slideDuration sliding interval of the window (i.e., the interval after which
  9. * the new DStream will generate RDDs); must be a multiple of this
  10. * DStream's batching interval
  11. */
  12. def reduceByKeyAndWindow(
  13. reduceFunc: (V, V) => V,
  14. windowDuration: Duration,
  15. slideDuration: Duration
  16. ): DStream[(K, V)] = ssc.withScope {
  17. reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
  18. }

http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html
image_1ass0ogq21l29145h4n3lqo1orf9.png-171.1kB
windowDuration 表示统计持续的时间,换句话说,就是最新一次统计往前的时间段
slideDuration 表示每次统计的时间间隔
二者均为batch interval的倍数

  1. // Reduce last 30 seconds of data, every 10 seconds
  2. val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

官方示例,意为每10秒统计一次,统计的时间范围为最新30秒内的数据。

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.StreamingContext._
  4. val ssc = new StreamingContext(sc, Seconds(2))
  5. // Step 1:Recevier Data From Where
  6. val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
  7. // Step 2: Process Data Base DStream
  8. // Split each line into words
  9. val words = socketDStream.flatMap(_.split(" "))
  10. // Count each word in each batch
  11. val pairs = words.map(word => (word, 1))
  12. val wordCounts = pairs.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(30), Seconds(2))
  13. // Step 3: Output Result
  14. // Print the first ten elements of each RDD generated in this DStream to the console
  15. wordCounts.print()
  16. ssc.start() // Start the computation
  17. ssc.awaitTermination()
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注