[关闭]
@hadoopMan 2017-03-04T14:26:32.000000Z 字数 2331 阅读 1268

flume+kafka+spark+streaming

spark


想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开

1,flume+kafka

在flume的source数据源接收到数据后 通过memoryChannel 到达sink,我们需要写一个kafkaSink 来将sink从channel接收的数据作为kafka的生产者 将数据 发送给消费者。

1,flume agent编写

  1. #agent section
  2. producer.sources = s
  3. producer.channels = c
  4. producer.sinks = r
  5. #source section
  6. #producer.sources.s.type = seq
  7. producer.sources.s.type = netcat
  8. producer.sources.s.bind = miaodonghua.host
  9. producer.sources.s.port = 5555
  10. producer.sources.s.channels = c
  11. # Each sink's type must be defined
  12. producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
  13. producer.sinks.r.topic = mytopic
  14. producer.sinks.r.brokerList = miaodonghua.host:9092
  15. producer.sinks.r.requiredAcks = 1
  16. producer.sinks.r.batchSize = 20
  17. #Specify the channel the sink should use
  18. producer.sinks.r.channel = c
  19. # Each channel's type is defined.
  20. producer.channels.c.type = memory
  21. producer.channels.c.capacity = 1000

2,启动flume

  1. bin/flume-ng agent \
  2. --conf conf \
  3. --name producer \
  4. --conf-file conf/kafka_flume.conf \
  5. -Dflume.root.logger=INFO,console

3,启动zookeeper

  1. bin/zkServer.sh start

4,启动 kafka

  1. bin/kafka-server-start.sh config/server.properties

5,运行kafka customers

  1. bin/kafka-console-consumer.sh --zookeeper miaodonghua.host:2181 --topic mytopic --from-beginning

6,测试

网flume监听端口发数据

  1. telnet miaodonghua.host 5555

flume与kafka集成成功.png-81.3kB

2,kafka+flume+spark streaming

在flume的source数据源接收到数据后 通过memory 到达sink,我们需要写一个kafkaSink 来将sink从channel接收的数据作为kafka的生产者 将数据 发送给spark streaming。

1,编写spark streaming的wordcount

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.kafka._
  4. val ssc = new StreamingContext(sc, Seconds(5))
  5. val topicMap = Map("mytopic" -> 1)
  6. val lines = KafkaUtils.createStream(ssc, "miaodonghua.host:2181", "testWordCountGroup", topicMap).map(_._2)
  7. val words = lines.flatMap(_.split(" "))
  8. val counts = words.map((_, 1L)).reduceByKey(_ + _)
  9. counts.print()
  10. ssc.start()
  11. ssc.awaitTermination()

2,启动spark

  1. bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \
  7. --master local[2]

3,运行wordcount

  1. :load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kafkaWordCount.scala

4,测试

输入:
输入.png-1.9kB
统计成功
输出成功.png-13.4kBe
注:
目前出现这个格式bug

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注