[关闭]
@peerslee 2017-05-11T02:52:01.000000Z 字数 4961 阅读 1871

zookeeper+kafka+flume

Spark

ufw使用

1、启用
sudo ufw enable
sudo ufw default deny
作用:开启了防火墙并随系统启动同时关闭所有外部对本机的访问(本机访问外部正常)。
2、关闭
sudo ufw disable
3、查看防火墙状态
sudo ufw status

Zookeeper 3.4.0

1.解压
2.配置

tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号

zk
server.1=localhost:2881:3881
server.2=localhost:2882:3882
server.3=localhost:2883:3883
clientPort=2181(2182,2183)

3.伪分布式
3.1

$ tree -dL 2
.
├── server_1
│   ├── data
│   └── zookeeper
├── server_2
│   ├── data
│   └── zookeeper
└── server_3
    ├── data
    └── zookeeper

9 directories

3.2

# peerslee @ peersleeLoveJiaLee in ~/opt/zk_servers [22:46:42] 
$ cat server_1/data/myid 
1

# peerslee @ peersleeLoveJiaLee in ~/opt/zk_servers [22:47:03] 
$ cat server_2/data/myid
2

# peerslee @ peersleeLoveJiaLee in ~/opt/zk_servers [22:47:07] 
$ cat server_3/data/myid
3

3.3

bin/zkServer.sh start

3.4

#!/bin/bash
for ((i=1; i<=3; i++))
do
   ~/opt/zk_servers/server_$i/zookeeper/bin/zkServer.sh start 
done

kafka_2.11-0.9.0.1

1.

$ cat server.properties-0 
broker.id=0
port=9092
log.dirs=/home/peerslee/opt/kafka/tmp/kafka-logs-0
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
delete.topic.enable=true  

$ cat server.properties-1
broker.id=1
port=9093
log.dirs=/home/peerslee/opt/kafka/tmp/kafka-logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
delete.topic.enable=true  

$ cat server.properties-2
broker.id=2
port=9094
log.dirs=/home/peerslee/opt/kafka/tmp/kafka-logs-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# 删除
delete.topic.enable=true  

2.

$ cat kafka_start.sh 
    #!/bin/bash
    cd ~/opt/kafka/
    for ((i=0; i<=2; i++))
    do
        ./bin/kafka-server-start.sh ./config/server.properties-$i
done
cd ~/

3.

$ kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic test --replication-factor 1 --partitions 3
Created topic "test".

$ kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test
hello
word 

$ kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic test --from-beginning
hello
word

4. 删除topic

kafka-topics.sh --delete --zookeeper
localhost:2181,localhost:2182,localhost:2183 --topic test

  1. 查看topic

kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --list
aboutyunlog

flume1.6

1.简介
flume是个日志收集系统,这个日志收集系统由一个或多个agent(代理)构成,每个agent由三部分构成:Source、Channel、Sink。

source为水源,是aent获取数据的入口;
channel为管道,是数据(由resource获得)流动的通道,主要作用是用来传输和存储数据;
sink为水槽,用来接收channel传入的数据并将数据输出到指定地方。

大家可以把agent看作一个水管,source就是水管的入口,sink就是水管的出口,把数据当作水来看,数据流也就意味着水流。数据由source获得流经channel,最后传给sink。

2.三大组件的详细介绍

Flume Source

Avro Source 支持Avro协议(实际上是Avro RPC),内置支持
Thrift Source 支持Thrift协议,内置支持
Exec Source 基于Unix的command在标准输出上生产数据
JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source 监控指定目录内数据变更
Twitter 1% firehose Source 通过API持续下载Twitter数据,试验性质
Netcat Source 监控某个端口,将流经端口的每一个文本行数据作为Event输入
Sequence Generator Source 序列生成器数据源,生产序列数据
Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)

Flume Channel

Memory Channel Event数据存储在内存中
JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel Event数据存储在磁盘文件中
Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel 测试用途
Custom Channel 自定义Channel实现

Flume Sink

HDFS Sink 数据写入HDFS
Logger Sink 数据写入日志文件
Avro Sink 数据被转换成Avro Event,然后发送到配置的RPC端口上
Thrift Sink 数据被转换成Thrift Event,然后发送到配置的RPC端口上
IRC Sink 数据在IRC上进行回放
File Roll Sink 存储数据到本地文件系统
Null Sink 丢弃到所有数据
HBase Sink 数据写入HBase数据库
Morphline Solr Sink 数据发送到Solr搜索服务器(集群)
ElasticSearch Sink 数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink 写数据到Kite Dataset,试验性质的
Custom Sink 自定义Sink实现

3.配置

# agent 的名称为a1
a1.sources=source1
a1.channels=channel1
a1.sinks=sink1

# set Sources
a1.sources.source1.type=spooldir
a1.sources.source1.spoolDir=/home/peerslee/flume_data/aboutyunlog

# set channel
a1.channels.channel1.type=file
a1.channels.channel1.checkpointDir=/home/peerslee/flume_data/checkpoint
a1.channels.channel1.dataDirs=/home/peerslee/flume_data/data

# set sink
a1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.brokerList=localhost:9092,localhost:9093,localhost:9094
a1.sinks.sink1.topic=aboutyunlog

# bind
a1.sources.source1.channels=channel1
a1.sinks.sink1.channel=channel1

4.启动flume

flume-ng agent --conf-file ~/opt/flume/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console

5.创建一个kafka的消费者

$ kafka-console-consumer.sh --zookeeper
localhost:2181,localhost:2182,localhost:2183 --topic aboutyunlog
--from-beginning

6.
向 a1.sources.source1.spoolDir=/home/peerslee/flume_data/aboutyunlog 这个目录复制文件,消费者控制台就会进行消费

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注