[关闭]
@hadoopMan 2016-06-29T03:52:32.000000Z 字数 4698 阅读 971

kafka与streaming集成

spark


简介

Apache Kafka是分布式发布-订阅消息系统。它最初由Linkedln公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比有以下不同:

1,配置kafka

1,produce.properties

  1. metadata.broker.list=miaodonghua.host:9092

2,server.properties

  1. host.name=miaodonghua.host
  2. log.dirs=/opt/cdh5.3.6/kafka_2.10-0.8.2.1/kafka-logs
  3. zookeeper.connect=miaodonghua.host:2181

3,使用kafka

  1. bin/kafka-server-start.sh config/server.properties

启动kafka成功.png-25.2kB

1) 创建 Topic

  1. bin/kafka-topics.sh --create --zookeeper miaodonghua.host:2181 --replication-factor 1 --partitions 1 --topic test

创建topic成功.png-45.9kB
查看topic

  1. bin/kafka-topics.sh --list --zookeeper miaodonghua.host:2181

查看topic.png-25.5kB

2) 发布信息到Topic

  1. bin/kafka-console-producer.sh --broker-list miaodonghua.host:9092 --topic ucloudSafe

produce.png-14.1kB

3) 订阅者订阅消息

  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

consumer.png-10.9kB

2,Receiver-based Approach

1,启动spark-shell

  1. bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.5.2.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.2.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.2.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar \
  7. --master local[2]

2,编写kafkaWordCount.scala

Approach 1: Receiver-based Approach

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.kafka._
  4. val ssc = new StreamingContext(sc, Seconds(5))
  5. val topicMap = Map("test" -> 1)
  6. val lines = KafkaUtils.createStream(ssc, "miaodonghua.host:2181", "testWordCountGroup", topicMap).map(_._2)
  7. val words = lines.flatMap(_.split(" "))
  8. val counts = words.map((_, 1L)).reduceByKey(_ + _)
  9. counts.print()
  10. ssc.start()
  11. ssc.awaitTermination()

3,执行脚本

  1. :load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kakaWordCount.scala

kafka接受到数据.png-13.1kB

3,Direct Approach

1,启动spark-shell

  1. bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \
  7. --master local[2]

2,编写kafkaWordCount2.scala

Approach 2: (No Receivers)

  1. import kafka.serializer.StringDecoder
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.kafka._
  5. val ssc = new StreamingContext(sc, Seconds(5))
  6. val kafkaMapParams = Map("metadata.broker.list" -> "miaodonghua.host:9092")
  7. val topicsSet = Set("test")
  8. val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)
  9. val words = lines.flatMap(_.split(" "))
  10. val counts = words.map((_, 1L)).reduceByKey(_ + _)
  11. counts.print()
  12. ssc.start()
  13. ssc.awaitTermination()

3,执行脚本

  1. :load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kakaWordCount2.scala

kafka接受数据2.png-29.3kB

4,UpdateStateByKey

1,启动spark-shell

  1. bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \
  7. --master local[2]

2,UpdateStateByKey

  1. import kafka.serializer.StringDecoder
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.kafka._
  5. val ssc = new StreamingContext(sc, Seconds(5))
  6. ssc.checkpoint(".")
  7. val kafkaMapParams = Map("metadata.broker.list" -> "miaodonghua.host:9092")
  8. val topicsSet = Set("test")
  9. // Option[S]
  10. val updateFunc =(values: Seq[Int], state: Option[Int]) => {
  11. val currentCount = values.sum
  12. val previousCount = state.getOrElse(0)
  13. Some(currentCount + previousCount)
  14. }
  15. val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)
  16. val words = lines.flatMap(_.split(" "))
  17. val counts = words.map((_, 1)) // (hello,list(1,2,3,4,5)) (spark,1)
  18. // updateStateByKey
  19. val state = counts.updateStateByKey[Int](updateFunc)
  20. state.print()
  21. ssc.start()
  22. ssc.awaitTermination()

3,执行脚本

  1. :load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/UpdateStateByKey.scala

第一次.png-34kB
第二次.png-35.5kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注