@1234567890
2017-05-22T13:27:51.000000Z
字数 10457
阅读 1588
消息队列
# The id of the broker. This must be set to a unique integer for each broker.# 非常重要的一个属性,在Kafka集群中每一个brocker的id一定要不一样,否则启动时会报错broker.id=2# The port the socket server listens onport=9092# Hostname the broker will bind to. If not set, the server will bind to all interfaces#host.name=localhost# The number of threads handling network requestsnum.network.threads=2# The number of threads doing disk I/O# 故名思议,就是有多少个线程同时进行磁盘IO操作。# 这个值实际上并不是设置得越大性能越好。# 在我后续的“存储”专题会讲到,如果您提供给Kafka使用的文件系统物理层只有一个磁头在工作# 那么这个值就变得没有任何意义了num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=1048576# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=1048576# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600# A comma seperated list of directories under which to store log files# 很多开发人员在使用Kafka时,不重视这个属性。# 实际上Kafka的工作性能绝大部分就取决于您提供什么样的文件系统log.dirs=/tmp/kafka-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across the brokers.num.partitions=2# The number of messages to accept before forcing a flush of data to disk# 从Page Cache中将消息正式写入磁盘上的阀值:以待转储消息数量为依据#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush# 从Page Cache中将消息正式写入磁盘上的阀值:以转储间隔时间为依据#log.flush.interval.ms=1000# The minimum age of a log file to be eligible for deletion# log消息信息保存时长,默认为168个小时log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining# segments don't drop below log.retention.bytes.# 默认为1GB,在此之前log文件不会执行删除策略# 实际环境中,由于磁盘空间根本不是问题,并且内存空间足够大。所以笔者会将这个值设置的较大,例如100GB。#log.retention.bytes=1073741824# The maximum size of a log segment file.# When this size is reached a new log segment will be created.# 默认为512MB,当达到这个大小,Kafka将为这个Partition创建一个新的分段文件log.segment.bytes=536870912# The interval at which log segments are checked to see if they can be deleted according# to the retention policies# 文件删除的保留策略,多久被检查一次(单位毫秒)# 实际生产环境中,6-12小时检查一次就够了log.retention.check.interval.ms=60000# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.log.cleaner.enable=false############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# root directory for all kafka znodes.# 到zookeeper的连接信息,如果有多个zookeeper服务节点,则使用“,”进行分割# 例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002zookeeper.connect=192.168.61.140:2181# Timeout in ms for connecting to zookeeper# zookeeper连接超时时间zookeeper.connection.timeout.ms=1000000
启动kafka
//1.进入zookeeper目录,启动zkbin/zkServer.sh start//2.进入kafka目录,启动kafkabin/kafka-server-start.sh config/server.properties
创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
查看topic状态
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

整个Kafka集群中,可以有多个消息生产者。这些消息生产者可能在同一个物理节点上,也可能在不同的物理节点。它们都必须知道哪些Kafka Broker List是将要发送的目标:消息生产者会决定发送的消息将会送入Topic的哪一个分区(Partition)。
消费者都是按照“组”的单位进行消息隔离:在同一个Topic下,消息生产者发送一条消息后,同一个Topic下不同组的消费者都会收到这条信息。
同一组下的消息消费者可以消费Topic下一个分区或者多个分区中的消息,但是一个分区中的消息只能被同一组下的某一个消息消费者所处理。
由于存在以上的操作规则,所以Kafka集群中Consumer(消费者)需要和Kafka集群中的Server Broker进行协调工作:这个协调工作者交给了Zookeeper集群。zookeeper集群需要记录/协调的工作包括:当前整个Kafka集群中有哪些Broker节点以及每一个节点处于什么状态(活动/离线/状态)、当前集群中所有已创建的Topic以及分区情况、当前集群中所有活动的消费者组/消费者、每一个消费者组针对每个topic的索引位置等。
如果当前消费者连接时,发现整个Kafka集群中存在一个消费者(记为消费者A)关联Topic下多个分区的情况,且消费者A处于繁忙无法处理这些分区下新的消息(即消费者A的上一批Pull的消息还没有处理完成)。这时新的消费者将接替原消费者A所关联的一个(或者多个)分区,并且一直保持和这个分区的关联。
由于Kafka集群中只保证同一个分区(Partition)下消息队列中消息的顺序。所以当一个或者多个消费者分别Pull一个Topic下的多个消息分区时,您在消费者端观察的现象可能就是消息顺序是混乱的。这里我们一直在说消费者端的Pull行为,是指的Topic下分区中的消息并不是由Broker主动推送到(Push)到消费者端,而是由消费者端主动拉取(Pull)。
Kafka将分区的多个副本分为两种角色:Leader和Follower,Leader Broker是主要服务节点,消息只会从消息生产者发送给Leader Broker,消息消费者也只会从Leader Broker中Pull消息。Follower Broker为副本服务节点,正常情况下不会公布给生产者或者消费者直接进行操作。Follower Broker服务节点将会主动从Leader Broker上Pull消息。
在这种工作机制下,Follower和Leader的消息复制过程由于Follower服务节点的性能、压力、网络等原因,它们和Leader服务节点会有一个消息差异性。当这个差异性扩大到一定的范围,Leader节点就会认为这个Follower节点再也跟不上自己的节奏,导致的结果就是Leader节点会将这个Follower节点移出“待同步副本集”ISR(in-sync replicas),不再关注这个Follower节点的同步问题。
只有当ISR中所有分区副本全部完成了某一条消息的同步过程,这条消息才算真正完成了“记录”操作。只有这样的消息才会发送给消息消费者。至于这个真正完成“记录”操作的通知是否能返回给消息生产者,完全取决于消息生产者采用的acks模式。----->强一致性复制和弱一致性复制
消息生产这还可以决定是以同步方式向Broker发送消息还是以异步方式向Broker发送消息。只需要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。
在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会立即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(一定注意是客户端本地)。当缓冲区的状态满足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。
在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在所有Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker自己完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);
// 可以通过这个属性控制复制过程的一致性规则 //props.put("request.required.acks", "1");
当acks设置为0时,生产者端不会等待Server Broker回执任何的ACK确认信息。只是将要发送的消息交给网络层。这种情况下,消息是否真的到达了Server Broker,实际上生产者端并不知道。由于生产者端并不等待Server Broker回执任何的ACK确认信息,那么消息一旦传输失败(例如,等待超时的情况)“重试”过程就无从谈起了。由于生产者端在这种情况下发送的消息,很可能Server Broker还没来得及处理,甚至更有可能Server Broker都没有接收到,所以Server Broker也无法告知生产者这条消息在分区中的偏移位置。
当acks设置为1时,生产者发送消息将等待这个分区的Leader Server Broker 完成它本地的消息记录操作,但不会等待这个分区下其它Follower Server Brokers的操作。在这种情况下,虽然Leader Server Broker对消息的处理成功了,也返回了ACK信息给生产者端,但是在进行副本复制时,还是可能失败。
当acks设置为“all”时,消息生产者发送消息时将会等待目标分区的Leader Server Broker以及所有的Follower Server Brokers全部处理完,才会得到ACK确认信息。这样的处理逻辑下牺牲了一部分性能,但是消息存储可靠性是最高的。
# 脚本命令范例kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2# 显示的结果Topic:my_topic2 PartitionCount:4 ReplicationFactor:2 Configs:Topic: my_topic2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: my_topic2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: my_topic2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: my_topic2 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
package cn.wht.kafka;import java.util.Date;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/*** Created by haoting.wang on 2016/11/29.*/public class KafkaProducer {public static void main(String[] args) {Properties props = new Properties();// 指定kafka节点列表,不需要由zookeeper进行协调// 并且连接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区状况props.put("metadata.broker.list", "127.0.0.1:9092");// 使用这个属性可以指定“将消息送到topic的哪一个partition中”,如果业务规则比较复杂的话可以指定分区控制器// 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送//props.put("partitioner.class", "kafkaTQ.PartitionerController");// 可以通过这个参数控制是异步发送还是同步发送(默认为“同步”)//props.put("producer.type", "async");// 可以通过这个属性控制复制过程的一致性规则//props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);// 创建消费者Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);// 由于我们为topic创建了四个partition,所以将数据分别发往这四个分区for (Integer partitionIndex = 0; ; partitionIndex++) {Date time = new Date();// 创建和发送消息,可以指定这条消息的key,producer根据这个key来决定这条消息发送到哪个parition中// 另外一个可以决定parition的方式是实现kafka.producer.Partitioner接口String messageContext_Value = new Date().toString()+"this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();System.out.println(messageContext_Value);byte[] messageContext = messageContext_Value.getBytes();byte[] key = partitionIndex.toString().getBytes();// 这是消息对象,请注意第二个参数和第三个参数,如果第三个参数没有被赋值,则使用第二个参数作为分区依据。所以在使用KeyedMessage类的构造函数时,您只需要指定其中的一个就完全够了。详情看下面KeyedMessage类(scala语言)KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 , messageContext);producer.send(message);// 休息0.5秒钟,循环发synchronized (KafkaProducer.class) {try {KafkaProducer.class.wait(500);} catch (InterruptedException e) {e.printStackTrace(System.out);}}}}}
package kafka.producer/*** A topic, key, and value.* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.*/case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {if(topic == null)throw new IllegalArgumentException("Topic cannot be null.")def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)def this(topic: String, key: K, message: V) = this(topic, key, key, message)def partitionKey = {if(partKey != null)partKeyelse if(hasKey)keyelsenull}def hasKey = key != null}
package cn.wht.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata;/*** 这是Kafka的topic消费者* @author yinwenjie*/public class KafkaConsumer_GroupOne {public static void main(String[] args) throws RuntimeException {// ==============首先各种连接属性// Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs// 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)// 这里我们设置几个关键属性Properties props = new Properties();// zookeeper相关的,如果有多个zk节点,这里以“,”进行分割props.put("zookeeper.connect", "127.0.0.1:2181");props.put("zookeeper.connection.timeout.ms", "10000");// 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。// 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样String groupname = "group2";props.put("group.id", groupname);//==============ConsumerConfig consumerConfig = new ConsumerConfig(props);ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);// 我们只创建一个消费者HashMap<String, Integer> map = new HashMap<String, Integer>();map.put("my_topic2", 1);Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);// 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition// 但是一个partition只能分配到一个消费线程去KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);new Thread(new ConsumerThread(stream)).start();// 接着锁住主线程,让其不退出synchronized (KafkaConsumer_GroupOne.class) {try {KafkaConsumer_GroupOne.class.wait();} catch (InterruptedException e) {e.printStackTrace(System.out);}}}/*** @author yinwenjie*/private static class ConsumerThread implements Runnable {private KafkaStream<byte[], byte[]> stream;/*** @param stream*/public ConsumerThread(KafkaStream<byte[], byte[]> stream) {this.stream = stream;}public void run() {ConsumerIterator<byte[], byte[]> iterator = this.stream.iterator();//============这个消费者获取的数据在这里while(iterator.hasNext()){MessageAndMetadata<byte[], byte[]> message = iterator.next();int partition = message.partition();String topic = message.topic();String messageT = new String(message.message());System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]");}}}}