[关闭]
@contribute 2016-09-08T01:35:45.000000Z 字数 13881 阅读 3053

Kafka 0.9.0.0 官方文档

kafka



0 相关资料参考

  1. kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)
  2. Apache kafka 工作原理介绍
  3. 分布式发布订阅消息系统 Kafka 架构设计 - 目前见到的最好的Kafka中文文章

1 开始

1.1 介绍

kafka是一个分布式的、分区的、可复制的日志提交服务。它提供了消息系统的功能,但设计完全不同。
这些是什么意思呢?
首先规定一些基本的消息术语:

从整体看,producers发送消息通过网络向kafka集群发送消息,这些集群faguolai反过来服务consumers,如下图:
producer_consumer.png-8.5kB
客户端和fuwq服务器之间的tongxin通信是通过简单高性能且语言无关的TCP protocol,我们提供的java客户端,但是也提供了其他语言的客户端。

主题和日志
我们首先深入liaojie了解kafka的topics。
一个topic是一个分类或用于表示消息发布的一个名字。kafka集群维护一个分区日志如xiatu下图:
log_anatomy.png-19.1kB
每个分区中的消息是顺序的、不可修改的序列。分区中的每个消息都被分配一个序列id称为offset,用于唯一确定这个分区中的每个消息。
无论发布的消息是否被消费,kafka集群保留消息一段时间,这个时间段是可配置的。
In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.
日志服务中的分区有以下几个目的:
1. 他们允许消息的规模超过适合在一个服务器上的规模。每个单独的分区必须适合每个服务器,但是主题可以有多个分区,因此能处理更多的数据。
2. 可以把分区看作一个并行单元。

分区

消息的分区分布在kafka集群中的服务器上,每个服务器处理数据和请求共享分区。每个分区的复制因子可配置以备容错。
每个分区都有一个主节点服务器和0到多个从节点服务器。主节点服务器处理对分区的所有读写请求,从节点会积极的替代主节点,一旦主节点失败了,多个主节点中的一个从节点会自动成为主节点,每个服务器都会成为一些分区的主节点和一个写分区的从节点,这样集群才能保持平衡。

producers
生产者有选择的为某个topics发布消息。生产者负责选择并并将消息分配到一个主题中的某个分区。这也可以以循环的方式完成来保持负载均衡或根据一些语义分区功能来完成(例如基于消息中的某个key)。
消费者

1.2 用户案例

kafka比较流行的使用。
- Messaging
- Website Activity Tracking
- Metrics
- Log Aggregation
- Stream Processing
- Event Sourcing
- Commit Log

2 API

kafka包含了新的java客户端(在org.apache.kafka.clients中)。这意味着支持老的scala客户端,但是为了兼容,它们还会共存一段时间。这些客户端封装了jar并且依赖很少,然而老的scala客户端会打包到服务端中。

2.1 生产者API

我们鼓励开发者使用新的java生产者。比老的scala客户端更加告诉且功能更丰富。添加一下依赖就可以:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.9.0.0</version>
  5. </dependency>

4 设计

4.1 motivation

  1. kafka想希望设计为作为数据数据提供处理的交互平台。
  2. 支持大容量事件流的高吞吐。
  3. 优雅的处理大数据积压,支持间歇性的从线下系统加载数据。
  4. 低延迟处理数据,支持更多传统使用场景。
  5. 支持分区的,分布式的,实时处理消息。
  6. 最后在这种情况下,输入其他数据流服务系统,我们知道系统必须能够保证容错的机器故障。

4.4 生产者

负载均衡

The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriate direct its requests.

The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be). For example if the key chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly designed to allow locality-sensitive processing in consumers.

异步发送

批处理是提高效率的一个重要因素,kafka为了支持批处理,将接受到的数据累积在缓存中,用一个请求以批量的形式发送出去。在内存中累积的数据大小是可以配置的,可以配置数据条数、数据总大小或最大延迟时间。批处理能减少IO操作次数。偶尔牺牲延迟换取更大的吞吐量也不失为一个不错的选择。

4.5 消费者

consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.

在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.

Push vs. pull

作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

4.6 Message Delivery Semantics

有这么几种可能的delivery guarantee:

    当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。

    接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once

读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)

如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)

总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。

4.7 分区

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group
中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的 。

为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。 

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性. 

4.8 常用配置文件解释

  1. ############################# System #############################
  2. #唯一标识在集群中的ID,要求是正数。
  3. broker.id=0
  4. #服务端口,默认9092
  5. port=9092
  6. #监听地址,不设为所有地址
  7. host.name=debugo01
  8. # 处理网络请求的最大线程数
  9. num.network.threads=2
  10. # 处理磁盘I/O的线程数
  11. num.io.threads=8
  12. # 一些后台线程数
  13. background.threads = 4
  14. # 等待IO线程处理的请求队列最大数
  15. queued.max.requests = 500
  16. # socket的发送缓冲区(SO_SNDBUF)
  17. socket.send.buffer.bytes=1048576
  18. # socket的接收缓冲区 (SO_RCVBUF)
  19. socket.receive.buffer.bytes=1048576
  20. # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
  21. socket.request.max.bytes = 104857600
  22. ############################# Topic #############################
  23. # 每个topic的分区个数,更多的partition会产生更多的segment file
  24. num.partitions=2
  25. # 是否允许自动创建topic ,若是false,就需要通过命令创建topic
  26. auto.create.topics.enable =true
  27. # 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。
  28. default.replication.factor =1
  29. # 消息体的最大大小,单位是字节
  30. message.max.bytes = 1000000
  31. ############################# ZooKeeper #############################
  32. # Zookeeper quorum设置。如果有多个使用逗号分割
  33. zookeeper.connect=debugo01:2181,debugo02,debugo03
  34. # 连接zk的超时时间
  35. zookeeper.connection.timeout.ms=1000000
  36. # ZooKeeper集群中leader和follower之间的同步实际
  37. zookeeper.sync.time.ms = 2000
  38. ############################# Log #############################
  39. #日志存放目录,多个目录使用逗号分割
  40. log.dirs=/var/log/kafka
  41. # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
  42. #log.flush.interval.messages=10000
  43. # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
  44. #log.flush.interval.ms=1000
  45. # 检查是否需要将日志flush的时间间隔
  46. log.flush.scheduler.interval.ms = 3000
  47. # 日志清理策略(delete|compact)
  48. log.cleanup.policy = delete
  49. # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
  50. log.retention.hours=168
  51. # 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
  52. #log.retention.bytes=1073741824
  53. # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
  54. log.segment.bytes=536870912
  55. # 当达到下面时间,会强制新建一个segment
  56. log.roll.hours = 24*7
  57. # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
  58. log.retention.check.interval.ms=60000
  59. # 是否开启压缩
  60. log.cleaner.enable=false
  61. # 对于压缩的日志保留的最长时间
  62. log.cleaner.delete.retention.ms = 1 day
  63. # 对于segment日志的索引文件大小限制
  64. log.index.size.max.bytes = 10 * 1024 * 1024
  65. #y索引计算的一个缓冲区,一般不需要设置。
  66. log.index.interval.bytes = 4096
  67. ############################# replica #############################
  68. # partition management controller 与replicas之间通讯的超时时间
  69. controller.socket.timeout.ms = 30000
  70. # controller-to-broker-channels消息队列的尺寸大小
  71. controller.message.queue.size=10
  72. # replicas响应leader的最长等待时间,若是超过这个时间,就将replicas排除在管理之外
  73. replica.lag.time.max.ms = 10000
  74. # 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
  75. controlled.shutdown.enable = false
  76. # 控制器关闭的尝试次数
  77. controlled.shutdown.max.retries = 3
  78. # 每次关闭尝试的时间间隔
  79. controlled.shutdown.retry.backoff.ms = 5000
  80. # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.
  81. replica.lag.max.messages = 4000
  82. #leader与relicas的socket超时时间
  83. replica.socket.timeout.ms= 30 * 1000
  84. # leader复制的socket缓存大小
  85. replica.socket.receive.buffer.bytes=64 * 1024
  86. # replicas每次获取数据的最大字节数
  87. replica.fetch.max.bytes = 1024 * 1024
  88. # replicas同leader之间通信的最大等待时间,失败了会重试
  89. replica.fetch.wait.max.ms = 500
  90. # 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小
  91. replica.fetch.min.bytes =1
  92. # leader中进行复制的线程数,增大这个数值会增加relipca的IO
  93. num.replica.fetchers = 1
  94. # 每个replica将最高水位进行flush的时间间隔
  95. replica.high.watermark.checkpoint.interval.ms = 5000
  96. # 是否自动平衡broker之间的分配策略
  97. auto.leader.rebalance.enable = false
  98. # leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
  99. leader.imbalance.per.broker.percentage = 10
  100. # 检查leader是否不平衡的时间间隔
  101. leader.imbalance.check.interval.seconds = 300
  102. # 客户端保留offset信息的最大空间大小
  103. offset.metadata.max.bytes = 1024
  104. #############################Consumer #############################
  105. # Consumer端核心的配置是group.id、zookeeper.connect
  106. # 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.
  107. group.id
  108. # 消费者的ID,若是没有设置的话,会自增
  109. consumer.id
  110. # 一个用于跟踪调查的ID ,最好同group.id相同
  111. client.id = <group_id>
  112. # 对于zookeeper集群的指定,必须和broker使用同样的zk配置
  113. zookeeper.connect=debugo01:2182,debugo02:2182,debugo03:2182
  114. # zookeeper的心跳超时时间,查过这个时间就认为是无效的消费者
  115. zookeeper.session.timeout.ms = 6000
  116. # zookeeper的等待连接时间
  117. zookeeper.connection.timeout.ms = 6000
  118. # zookeeper的follower同leader的同步时间
  119. zookeeper.sync.time.ms = 2000
  120. # 当zookeeper中没有初始的offset时,或者超出offset上限时的处理方式 。
  121. # smallest :重置为最小值
  122. # largest:重置为最大值
  123. # anything else:抛出异常给consumer
  124. auto.offset.reset = largest
  125. # socket的超时时间,实际的超时时间为max.fetch.wait + socket.timeout.ms.
  126. socket.timeout.ms= 30 * 1000
  127. # socket的接收缓存空间大小
  128. socket.receive.buffer.bytes=64 * 1024
  129. #从每个分区fetch的消息大小限制
  130. fetch.message.max.bytes = 1024 * 1024
  131. # true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset
  132. auto.commit.enable = true
  133. # 自动提交的时间间隔
  134. auto.commit.interval.ms = 60 * 1000
  135. # 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetch.message.max.bytes中数值
  136. queued.max.message.chunks = 10
  137. # 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数
  138. rebalance.max.retries = 4
  139. # 每次reblance的时间间隔
  140. rebalance.backoff.ms = 2000
  141. # 每次重新选举leader的时间
  142. refresh.leader.backoff.ms
  143. # server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
  144. fetch.min.bytes = 1
  145. # 若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
  146. fetch.wait.max.ms = 100
  147. # 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限
  148. consumer.timeout.ms = -1
  149. #############################Producer#############################
  150. # 核心的配置包括:
  151. # metadata.broker.list
  152. # request.required.acks
  153. # producer.type
  154. # serializer.class
  155. # 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
  156. metadata.broker.list
  157. #消息的确认模式
  158. # 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
  159. # 1:发送消息,并会等待leader 收到确认后,一定的可靠性
  160. # -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
  161. request.required.acks = 0
  162. # 消息发送的最长等待时间
  163. request.timeout.ms = 10000
  164. # socket的缓存大小
  165. send.buffer.bytes=100*1024
  166. # key的序列化方式,若是没有设置,同serializer.class
  167. key.serializer.class
  168. # 分区的策略,默认是取模
  169. partitioner.class=kafka.producer.DefaultPartitioner
  170. # 消息的压缩模式,默认是none,可以有gzip和snappy
  171. compression.codec = none
  172. # 可以针对默写特定的topic进行压缩
  173. compressed.topics=null
  174. # 消息发送失败后的重试次数
  175. message.send.max.retries = 3
  176. # 每次失败后的间隔时间
  177. retry.backoff.ms = 100
  178. # 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
  179. topic.metadata.refresh.interval.ms = 600 * 1000
  180. # 用户随意指定,但是不能重复,主要用于跟踪记录消息
  181. client.id=""
  182. # 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时
  183. queue.buffering.max.ms = 5000
  184. # 异步模式下缓冲的最大消息数,同上
  185. queue.buffering.max.messages = 10000
  186. # 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃
  187. queue.enqueue.timeout.ms = -1
  188. # 异步模式下,每次发送的消息数,当queue.buffering.max.messages或queue.buffering.max.ms满足条件之一时producer会触发发送。
  189. batch.num.messages=200

理解

  1. 分区中的数据都是不一样的。
  2. 在一段时间(可配置)内,无论有没有被消费,所有发布的消息都可以被集群获取到。
  3. 一条消息的id就是offset,一般都是顺序读,但也可以重读,由消费者决定。所以消费者的增加和减少对集群或其他消费者并没什么大的影响。
  4. 分区有两个作用:1,用于无限扩展。2,并行处理。
  5. 分区有复制因子。
  6. 分区有主节点和从节点之分,主节点负责所有的读和写,从节点负责同步。
  7. 主题有多分区,生产者在发布消息到某个主题时,需保证公平和均衡。
  8. kafka是靠消费组来实现队列发布订阅两个消息模式的。一个消费组只能获取一条消息,假设有五个组同时订阅一个主题,如果前四组每组中只有一个消费者,那么这4组对应的4个消费者就属于发布订阅模式,如果第5个组中有10个消费者,那个这个10个属于队列模式。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注