@caoo
2020-12-01T08:04:13.000000Z
字数 11367
阅读 423
KAKFA
Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。
kafka应用场景:
①异步处理 ②应用解耦 ③流量削峰 ④日志处理 ⑤消息通讯等。
Kafka性能特点:
①顺序读写 ②零拷贝 ③分区 ④批量发送 ⑤数据压缩 :
优点:
①支持跨数据中心的消息复制;
②单机吞吐量:十万级,最大的优点,就是吞吐量高;
③topic数量都吞吐量的影响:topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源;
④时效性:ms级;
⑤可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用;
⑥消息可靠性:经过参数优化配置,消息可以做到0丢失;
⑦功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:
①由于是批量发送,数据并非真正的实时; 仅支持统一分区内消息有序,无法实现全局消息有序;
②有可能消息重复消费;
③依赖zookeeper进行元数据管理,等等。
缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。
zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,
但是broker依然依赖于ZK,zookeeper在kafka中还用来选举controller 和 检测broker是否存活等等。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。
Cache Filesystem Cache PageCache缓存
顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
Zero-copy 零拷技术减少拷贝次数
Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。
增加线程
提高 batch.size
增加更多 producer 实例
增加 partition 数
设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;
跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。
1:(默认) 数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
0: 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1: producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。
unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样有可能就会丢数据,spark streaming在消费过程中拿到的 end offset 会突然变小,导致 spark streaming job挂掉。如果unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。
kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。
false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。
一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成
header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。
当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,
比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性
body是由N个字节构成的一个消息体,包含了具体的key/value消息
同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
1、消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0: 表示不进行消息接收是否成功的确认;
1: 表示当Leader接收成功时确认;
-1: 表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
2、消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
解决办法:
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
消息重复消费及解决参考:https://www.javazhiyin.com/22910.html
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
offset+1
Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.
Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。
参考:https://blog.csdn.net/u013256816/article/details/80697456
参考:https://blog.csdn.net/u013256816/article/details/89135417
https://blog.csdn.net/yanshu2012/article/details/54894629
传统的消息传递方法包括两种:
排队:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人。
发布-订阅:在这个模型中,消息被广播给所有的用户。
①快速:单一的Kafka代理可以处理成千上万的客户端,每秒处理数兆字节的读写操作。
②可伸缩:在一组机器上对数据进行分区
③和简化,以支持更大的数据
④持久:消息是持久性的,并在集群中进
⑤行复制,以防止数据丢失。
⑥设计:它提供了容错保证和持久性
在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节从套接口转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。
如果用户位于与broker不同的数据中心,则可能需要调优套接口缓冲区大小,以对长网络延迟进行摊销。
在数据中,为了精确地获得Kafka的消息,你必须遵循两件事: 在数据消耗期间避免重复,在数据生产过程中避免重复。
这里有两种方法,可以在数据生成时准确地获得一个语义:
每个分区使用一个单独的写入器,每当你发现一个网络错误,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功
在消息中包含一个主键(UUID或其他),并在用户中进行反复制
ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。ISR应该总是包含所有的副本,直到出现真正的故障。如果一个副本从leader中脱离出来,将会从ISR中删除。
Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
如果一个副本在ISR中保留了很长一段时间,那么它就表明,跟踪器无法像在leader收集数据那样快速地获取数据。
如果首选的副本不在ISR中,控制器将无法将leadership转移到首选的副本。
在大多数队列系统中,作为生产者的类无法做到这一点,它的作用是触发并忘记消息。broker将完成剩下的工作,比如使用id进行适当的元数据处理、偏移量等。
作为消息的用户,你可以从Kafka broker中获得补偿。如果你注视SimpleConsumer类,你会注意到它会获取包括偏移量作为列表的MultiFetchResponse对象。此外,当你对Kafka消息进行迭代时,你会拥有包括偏移量和消息发送的MessageAndOffset对象。
Kafka将消息以topic为单位进行归纳 将向Kafka topic发布消息的程序成为producers. 将预订topics并消费消息的程序成为consumer. Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群向消费者提供消息
(1)最多一次:消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次:消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once):不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的
(1)节点必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个节点的连接
(2)如果节点是个follower,他必须能及时的同步leader的写操作,延时不能太久
producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发,为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。
Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的
Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
消息长度: 4 bytes (value: 1+4+n)
版本号: 1 byte
CRC校验码: 4 bytes
具体的消息: n bytes
(1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
(2).通过索引信息可以快速定位message和确定response的最大大小。
(3).通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
(4).通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
(3).Kafka 支持实时的流式处理
副本因子不能大于 Broker 的个数;
第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。 当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。 如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。 但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
topic中的多个partition以文件夹的形式保存到broker,每个分区序号从0递增, 且消息有序 Partition文件下有多个segment(xxx.index,xxx.log) segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g 如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名
request.required.acks有三个值 0 1 -1
0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢失
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费。同时也可以按照指定的offset进行重新消费。
结合consumer的加入和退出进行再平衡策略。
消费者组里某具体分区是有序的,所以要保证有序只能建一个分区,但是实际这样会存在性能问题,具体业务具体分析后确认。
每一条消息都是以(key,value)格式 Key是由生产者发送数据传入 所以生产者(key)决定了数据产生到集群的哪个partition
①一旦消费者加入或退出消费组,导致消费组成员列表发生变化,消费组中的所有消费者都要执行再平衡。
②订阅主题分区发生变化,所有消费者也都要再平衡。
(1)关闭数据拉取线程,情空队列和消息流,提交偏移量;
(2)释放分区所有权,删除zk中分区和消费者的所有者关系;
(3)将所有分区重新分配给每个消费者,每个消费者都会分到不同分区;
(4)将分区对应的消费者所有关系写入ZK,记录分区的所有权信息;
(5)重启消费者拉取线程管理器,管理每个分区的拉取线程。