@Arslan6and6
2016-10-10T06:53:15.000000Z
字数 15796
阅读 1120
spark
-1,实时统计,累加
kafka + sparkstreaming(updateStatByKey)
-2,实时统计,最近一段时间指标
实时查看最近一个小时之内的用户点击量,各省或者重点城市
window
官方案例
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html
[root@hadoop-senior softwares]# rpm -ivh nc-1.84-22.el6.x86_64.rpm
Preparing... ########################################### [100%]
1:nc ########################################### [100%]
[root@hadoop-senior softwares]# which nc
/usr/bin/nc
[beifeng@hadoop-senior softwares]$ nc -lk 9999
/bin/run-example streaming.NetworkWordCount hadoop-senior.ibeifeng.com 9999
官方模板
[beifeng@hadoop-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --master local[3] --jars /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/inputJars/mysql-connector-java-5.1.27-bin.jar
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop()
监视 "hadoop-senior.ibeifeng.com", 9999 ,当该端口有数据写入时,print()
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@31d1e747
// Step 1:Recevier Data From Where
scala> val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@33d32a0d
// Step 2: Process Data Base DStream
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@3c9df3cb
scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@336283a
scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@55c528c5
// Step 3: Output Result
wordCounts.print()
scala> ssc.start() // Start the computation
scala> ssc.awaitTermination()
[beifeng@hadoop-senior ~]$ nc -lk 9999
spark context streaming spark
-------------------------------------------
Time: 1473141620000 ms
-------------------------------------------
(streaming,1)
(spark,2)
(context,1)
-1,Recevier
block interval 接收收据分块储存的时间 默认值200ms
接收的数据,分成block,进行存储在Executors中
-2,StreamingContext
batch interval 处理数据的时间间隔,每次处理多长时间范围内的数据
ssc = new StreamingContext(sc, Second(1))
1秒为t-0至t-1000,该时间内的block(blk-01 至 blk-05)合并成一个RDD,然后进行处理 Process
此RDD中的5个block对应5个task,从而进行并行处理
t-0
t-200 [blk-01
t-400 blk-02
t-600 blk-03
t-800 blk-04
t-1000 blk-05]
t-1200 blk-06
t-1400 blk-07
http://spark.apache.org/docs/1.6.1/configuration.html
spark.streaming.blockInterval 默认200ms 最小50ms
Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms.
spark.streaming.ui.retainedBatches 默认在4040界面最多显示1000个job
-3.测试常用输出方法
print()
saveAsTextFiles(prefix, [suffix])
foreachRDD(func) 把众多RDD写入外部系统,或是通过网络写入数据库
The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
/user/beifeng/sparkstreaming/hdfsfiles
监视该目录 ,当该目录有新数据写入时,print()
val lines = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999) 替换成
val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
// Create a DStream that will connect to hostname:port, like localhost:9999
val socketDStream = ssc.textFileStream("/user/beifeng/sparkstreaming/hdfsfiles")
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop()
开发方式
1. spark shell code 开发、测试
2. spark shell load script
将以上代码写入.scala脚本文件,在 spark-shell 加载脚本方式执行(load script)
scala> :load 脚本绝对路径
[beifeng@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ vi WordCount.scala
3.IDE工具 开发、测试、打包JAR (生产环境),spark-submit提交应用程序
IDEA POM.XML添加依赖
<!-- Spark STREAMING -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
开发代码:
package com.ibeifeng.bigdata.spark.app.streaming
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming._
object SocketWordCount {
def main(args: Array[String]) {
// step 0: SparkContext
val sparkConf = new SparkConf()
.setAppName("SocketWordCount Applicaiton") // name
.setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
// Create SparkContext
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
}
}
nc -lk 9999 输入数据后,IEDA控制台实时显示输入数据
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#input-dstreams-and-receivers
Spark Streaming provides two categories of built-in streaming sources.
* Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, socket connections, and Akka actors.
* Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.
Advanced sources中,应注意与spark版本匹配,对于flume,应使用CDH版
* Kafka: Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1. See the Kafka Integration Guide for more details.
* Flume: Spark Streaming 1.6.1 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.
实时累加功能实现:
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#transformations-on-dstreams
* updateStateByKey(func)
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
PairDStreamFunctions类中有如下方法
updateStateByKey() 传统方法 6种重载
mapWithState() spark1.6出现
updateStateByKey()之一。
并且注意 checkpoint("检查点路径") 即状态存储路径,保证数据HA
如运行程序没有结果显示,可能是由于前面运行程序仍然占用同一端口,关闭所有提交进程,重新运行程序。
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
方法声明解释:高阶函数 updateStateByKey 传入函数参数 updateFunc ,其中参数 Seq[V] 表示 当期统计结果,Option[S]表示往期统计状态(结果),返回值Option[S]表示累计统计结果
package com.ibeifeng.bigdata.spark.app.streaming
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by beifeng on 9/6/16.
*/
object UpdataStateWordCount {
def main(args: Array[String]) {
// step 0: SparkContext
val sparkConf = new SparkConf()
.setAppName("SocketWordCount Applicaiton") // name
.setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
// Create SparkContext
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("sparkstreaming/socketwc3")
// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.updateStateByKey(
(values:Seq[Int], state:Option[Int])=>{
val currentCount = values.sum
val previousCount = state.getOrElse(0)
//updata state and return
Some(currentCount + previousCount)
}
)
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
}
}
/**
* :: Experimental ::
* Return a [[MapWithStateDStream]] by applying a function to every key-value element of
* `this` stream, while maintaining some state data for each unique key. The mapping function
* and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
* transformation can be specified using [[StateSpec]] class. The state data is accessible in
* as a parameter of type [[State]] in the mapping function.
*
* Example of using `mapWithState`:
* {{{
* // A mapping function that maintains an integer state and return a String
* def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
* // Use state.exists(), state.get(), state.update() and state.remove()
* // to manage state, and return the necessary string
* }
*
* val spec = StateSpec.function(mappingFunction).numPartitions(10)
*
* val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
* }}}
*
* @param spec Specification of this transformation
* @tparam StateType Class type of the state data
* @tparam MappedType Class type of the mapped data
*/
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
mapWithState参数解析:
1.mapWithState需要一个 StateSpec 对象实例。 spec: StateSpec 表示一个转换规范
2.而在 StateSpec 其伴生类对象中含有4个名为 function 的方法 。我们选取其中一个所需参数与 mapWithState 对应的方法来创建 StateSpec 对象实例。
/**
* Create a [[org.apache.spark.streaming.StateSpec StateSpec]] for setting all the specifications
* of the `mapWithState` operation on a
* [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]].
*
* @param mappingFunction The function applied on every data item to manage the associated state
* and generate the mapped data
* @tparam ValueType Class of the values
* @tparam StateType Class of the states data
* @tparam MappedType Class of the mapped data
*/
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
ClosureCleaner.clean(mappingFunction, checkSerializable = true)
val wrappedFunction =
(time: Time, key: KeyType, value: Option[ValueType], state: State[StateType]) => {
Some(mappingFunction(key, value, state))
}
new StateSpecImpl(wrappedFunction)
}
3.现在需要给 function 方法传递一个 函数参数 mappingFunction,该函数需要自己定义
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala 官方示例代码
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
自定义 mappingFunction 方法
def mappingFunction(key: String, value: Option[Int],
state: State[Int]): (String, Int) = {
// Use state.exists(), state.get(), state.update() and state.remove()
var previousCount = 0
if(state.exists()){
previousCount = state.get()
}
// 获取现在传递的值
val currentCount = value.getOrElse(0)
// 更新状态
state.update(previousCount + currentCount)
(key, previousCount + currentCount)
}
完成代码:
package com.ibeifeng.bigdata.spark.app.streaming
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by XuanYu on 2016/8/7.
*/
object MapStateWordCount {
def main(args: Array[String]) {
// step 0: SparkContext
val sparkConf = new SparkConf()
.setAppName("LogAnalyzer Applicaiton") // name
.setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
// Create SparkContext
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("sparkstreaming/socketwc2/")
// Step 1: Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior01.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
/**
* spec: StateSpec[K, V, StateType, MappedType]
*/
/**
* mappingFunction:
* (KeyType, Option[ValueType], State[StateType]) => MappedType
*/
// A mapping function that maintains an integer state and return a String
def mappingFunction(key: String, value: Option[Int],
state: State[Int]): (String, Int) = {
/**
* val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
* val output = (word, sum)
* state.update(sum)
* output
*/
// Use state.exists(), state.get(), state.update() and state.remove()
var previousCount = 0
if(state.exists()){
previousCount = state.get()
}
// 获取现在传递的值
val currentCount = value.getOrElse(0)
// 更新状态
state.update(previousCount + currentCount)
// to manage state, and return the necessary string
// 返回的是你更新的KEY
(key, previousCount + currentCount)
}
val spec = StateSpec.function(mappingFunction _)
val wordCounts = pairs.mapWithState(spec)
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
sc.stop
}
}
使用 class PairDStreamFunctions 里的 reduceByKeyAndWindow 方法。以下为该方法重载之一
/**
* Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html
windowDuration 表示统计持续的时间,换句话说,就是最新一次统计往前的时间段
slideDuration 表示每次统计的时间间隔
二者均为batch interval的倍数
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
官方示例,意为每10秒统计一次,统计的时间范围为最新30秒内的数据。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(2))
// Step 1:Recevier Data From Where
val socketDStream = ssc.socketTextStream("hadoop-senior.ibeifeng.com", 9999)
// Step 2: Process Data Base DStream
// Split each line into words
val words = socketDStream.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(30), Seconds(2))
// Step 3: Output Result
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination()