@hadoopMan
2017-03-04T14:26:32.000000Z
字数 2331
阅读 1268
spark
想学习spark,hadoop,kafka等大数据框架,请加群459898801,满了之后请加2群224209501。后续文章会陆续公开
在flume的source数据源接收到数据后 通过memoryChannel 到达sink,我们需要写一个kafkaSink 来将sink从channel接收的数据作为kafka的生产者 将数据 发送给消费者。
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
#producer.sources.s.type = seq
producer.sources.s.type = netcat
producer.sources.s.bind = miaodonghua.host
producer.sources.s.port = 5555
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = miaodonghua.host:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000
bin/flume-ng agent \
--conf conf \
--name producer \
--conf-file conf/kafka_flume.conf \
-Dflume.root.logger=INFO,console
bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-consumer.sh --zookeeper miaodonghua.host:2181 --topic mytopic --from-beginning
网flume监听端口发数据
telnet miaodonghua.host 5555
在flume的source数据源接收到数据后 通过memory 到达sink,我们需要写一个kafkaSink 来将sink从channel接收的数据作为kafka的生产者 将数据 发送给spark streaming。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val topicMap = Map("mytopic" -> 1)
val lines = KafkaUtils.createStream(ssc, "miaodonghua.host:2181", "testWordCountGroup", topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1L)).reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
bin/spark-shell \
--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka_2.10-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/kafka-clients-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/metrics-core-2.2.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externaljars/zkclient-0.3.jar \
--master local[2]
:load /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/kafkaWordCount.scala
输入:
统计成功
e
注:
目前出现这个格式bug