@hadoopMan
2016-06-29T03:52:32.000000Z
字数 4698
阅读 1176
spark
Apache Kafka是分布式发布-订阅消息系统。它最初由Linkedln公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比有以下不同:

metadata.broker.list=miaodonghua.host:9092
host.name=miaodonghua.hostlog.dirs=/opt/cdh5.3.6/kafka_2.10-0.8.2.1/kafka-logszookeeper.connect=miaodonghua.host:2181
bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --zookeeper miaodonghua.host:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper miaodonghua.host:2181

bin/kafka-console-producer.sh --broker-list miaodonghua.host:9092 --topic ucloudSafe

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

bin/spark-shell \--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.5.2.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.2.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.2.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar \--master local[2]
Approach 1: Receiver-based Approach
import org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.kafka._val ssc = new StreamingContext(sc, Seconds(5))val topicMap = Map("test" -> 1)val lines = KafkaUtils.createStream(ssc, "miaodonghua.host:2181", "testWordCountGroup", topicMap).map(_._2)val words = lines.flatMap(_.split(" "))val counts = words.map((_, 1L)).reduceByKey(_ + _)counts.print()ssc.start()ssc.awaitTermination()
:load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kakaWordCount.scala

bin/spark-shell \--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \--master local[2]
Approach 2: (No Receivers)
import kafka.serializer.StringDecoderimport org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.kafka._val ssc = new StreamingContext(sc, Seconds(5))val kafkaMapParams = Map("metadata.broker.list" -> "miaodonghua.host:9092")val topicsSet = Set("test")val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)val words = lines.flatMap(_.split(" "))val counts = words.map((_, 1L)).reduceByKey(_ + _)counts.print()ssc.start()ssc.awaitTermination()
:load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kakaWordCount2.scala

bin/spark-shell \--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \--master local[2]
import kafka.serializer.StringDecoderimport org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.kafka._val ssc = new StreamingContext(sc, Seconds(5))ssc.checkpoint(".")val kafkaMapParams = Map("metadata.broker.list" -> "miaodonghua.host:9092")val topicsSet = Set("test")// Option[S]val updateFunc =(values: Seq[Int], state: Option[Int]) => {val currentCount = values.sumval previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)val words = lines.flatMap(_.split(" "))val counts = words.map((_, 1)) // (hello,list(1,2,3,4,5)) (spark,1)// updateStateByKeyval state = counts.updateStateByKey[Int](updateFunc)state.print()ssc.start()ssc.awaitTermination()
:load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/UpdateStateByKey.scala
