[关闭]
@Catyee 2020-10-18T12:15:14.000000Z 字数 19351 阅读 405

分布式与zookeeper

面试


一、分布式

概念:分布式系统指硬件或软件分布在不同的网络计算机上,彼此之间仅仅通过消息传递来通信和协调。可以是按照业务垂直分布(微服务),也可以是同一个业务水平分布(多活)

好处:服务能力提升,互相影响减小,并发增高,容错性增加;扩展性能力强;降低成本(普通机器就可胜任);程序员只用关心一部分业务;

坏处:故障总会发生,比如通信异常,节点故障等等

二、CAP和Base理论

2.1 CAP理论

Cap理论是指一个分布式系统不可能同时满足一致性,可用性和分区容错性,最多只能同时满足其中三个基本需求。

一致性:指分布式环境中,数据在多个节点之间能否保持一致的特性。
可用性:指系统提供的服务总是处于可用的状态
分区容错性:指分布式系统在遇到网络故障或者节点故障的时候仍然能够对外提供满足一致性的服务。除非整个分布式系统所有服务都发生了故障。

由于不可兼得,所以需要进行取舍。大多数情况下都会选择放弃强一致性,而选择可用性和分区容错性。但是Zookeeper不太一样,Zookeeper尽量做到强一致性(实际上还是最终一致性),而放弃了部分可用性(选举的时候不能对外提供服务)

2.2 BASE理论

base理论是对Cap中一致性和可用性权衡的结果。核心思想是即使无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性。
base理论中的三个概念——基本可用、软状态和最终一致性:
基本可用指分布式系统在出现故障的时候,允许损失部分可用性,比如响应时间上的损失和功能上的部分损失。
软状态:指允许系统中的数据存在中间状态,可以简单理解为允许数据在节点之间同步的时候存在延迟
最终一致性:指系统中的数据副本,在经过一段时间的同步之后,最终能够达到一个一致的状态。

Base理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性是相反的,它完全不同于ACID的强一致性模型,而是提出通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终能达到一致性的状态。

三、一致性

强一致性(线性一致性)
强一致性可以理解为当写入操作完成后,任何客户端去访问任何存储节点的值都是最新的值,将分布式的一致性过程对客户端透明,客户端操作一个强一致性的数据库时感觉自己操作的是一个单机数据库,强一致性就是CAP定理中所描述的C(Consistency)

弱一致性
弱一致性是与强一致性对立的一种一致性级别,这种一致性级别不承诺立即可以读到写入的值,也不具体承诺多久之后数据能够达到一致,但会尽可能地保证到某个时间级别(比如秒级别)后,数据能够达到一致状态。弱一致性还可以再进行细分∶
会话一致性∶该一致性级别只保证对于写入的值,在同一个客户端会话
中可以读到一致的值,但其他的会话不能保证。

用户一致性∶该一致性级别只保证对于写入的值,在同一个用户中可以
读到一致的值,但其他用户不能保证。
在某些系统下,对一致性的要求并不高,从而可以舍弃强一致策略带来的性能与可用性消耗。

最终一致性
最终一致性也可以理解成弱一致性的一种,使用这种一致性级别,依旧可能在写入后读到旧值,但做出的改进是要求数据在有限的时间窗口内最终达到一致的状态。也就说就算现在不一致,也早晚会达到一致,但狭义上的弱一致性并不对一致性做出任何保证,也许某些节点永远不会达到一致,其实最终一致性的核心就是保证同步的请求不会丢失,在请求到达时节点的状态变为最新状态,而不考虑请求传输时的不一致窗口,DNS就是典型的最终一致性系统。

四、一致性协议:

为了解决分布式一致性问题,涌现了很多经典的一致性协议和算法,最著名的就是二阶段提交、三阶段提交和Paxos算法。
二阶段提交、三阶段提交应对的是分布式系统中,不同节点协同完成同一个任务的情况;而Paxos算法、Raft算法、ZAB算法都是为了应对分布式系统中,某个任务可以单节点执行,但是每个节点要最终保持结果一致的情况,两种算法应对的场景并不一样。

4.1 二阶段提交

在一个分布式集群中,要执行一个分布式事务,最直接的方法是引入一个协调者,用于统一调度分布式事务在分布式节点上的执行逻辑,这些分布式节点叫做参与者。二阶段提交实际上就是协调者和参与者执行分布式事务的过程,其主要目的就是为了解决分布式事务的原子性问题,保证分布式事务的多个参与者要么都执行成功要么都执行失败。

从名字就可以看出二阶段提交就是将事务分成了两个阶段来处理:
第一个阶段即投票阶段:
a、事务询问:当协调者收到一个事务请求之后,先记录日志,然后通知所有参与者要执行一个事务,并询问所有的参与者是否可以执行提交操作,等待参与者回应
b、参与者开始执行事务操作,将事务操作的undo和redo信息记入事务日志,但是并不提交
c、参与者先在自己日志中记录自己是否可以提交事务,然后向协调者回应。如果参与者在执行事务的时候每个步骤都成功了,就回复可以提交,如果执行事务的时候失败了,就立刻回滚,并回应不能提交。
第二个阶段即事务提交阶段:
在第二个阶段中,协调者要根据参与者的反馈情况来决定这个事务最终是提交还是回滚。
如果所有参与者都进行了响应,并且响应结果都是可以提交事务,那协调者就会做出提交事务的决定,先记录自己的决定到自己的日志,然后发送提交事务的请求给每一个参与者。这一步只要协调者记录了日志,就可以响应客户端的请求了,不需要等到参与者都提交了之后再回应。参与者收到提交的请求之后就正式提交事务,释放资源,然后向协调者进行回复,协调者收到所有参与者的反馈之后完成事务。
如果某一个参与者向协调者反馈不能提交事务,或者超过了超时时间,还有一些参与者没有响应是否提交事务,协调者就会做出中断事务的决定,同样先往自己的日志中记录自己的决定,然后向每个参与者发送回滚事务的请求。参与者接收到事务回滚的请求之后,根据undo日志来回滚,然后释放资源并向协调者进行回复。协调者接收到所有参与者的回复消息之后,完成事务的中断。

逻辑上二阶段提交很清晰,但是由于分布式系统都靠网络来传输消息,而网络是不可靠的,网络的原因将导致各种各样状况的发生,会使得情况异常的复杂。

各种各样的超时问题:
1、第一阶段协调者询问能否提交事务,部分参与者响应超时:
协调者将做出中断事务的决定,先写日志,然后通知参与者中断事务。这种情况下,可能是协调者发送了事务请求,参与者没有接收到,也可能是参与者执行完事务并进行了回复,但是协调者每收到,如果是后者,实际上每个参与者可能都做好了准备,但是协调者没有接收到回应,只能做出中断事务的决定,这是一种保守的策略,会浪费一些资源。
2、假如某个参与者执行完了事务,并回应了协调者自己可以提交(如果不能提交,可以回复不能提交,然后直接回滚,不用等待协调者通知),然后这个参与者开始等待协调者的决定,等待过程超时:
这种情况比较复杂,可能协调者做出了提交的决定,只是自己没有收到,所以自己不能擅自回滚,但也不能擅自提交,因为协调者也可能做出中断事务的决定。这个时候可以一直阻塞,但也有一些可以优化的点,比如这个参与者是A,A可以去询问其它参与者,如果某个参与者说我没接收到协调者的决定,但是我自己的事务执行失败了,我向协调者发出了不能提交事务的回应,这个时候A也就可以直接回滚了。另外的情况,如果某一个参与者说我接受到了协调者的决定,协调者的决定是提交(回滚),那A也就可以提交(回滚)了。但如果被询问的节点也没有回复,或者回复说也没有收到协调者的命令,这个时候A别无他法只能继续阻塞。这个优化叫做"超时终止协议",它能解决一部分的超时等待问题,但不能解决全部,并且增加了编程的复杂性。

各种宕机问题:
1、如果协调者做出了决定,但是决定还没发送或者只发送了部分就宕机了:
这种情况下,协调者重启之后可以从自己的日志中知道自己之前做的决定,参与者可以通过互相询问的方式来知道这个决定,也可以重新询问协调者。
如果协调者日志中都没有自己做的决定,那直接中断事务就可以了。
2、参与者执行了事务,还没来得及回复协调者就发生了宕机:
这个时候参与者先从自己日志中读取日志,如果发现磁盘中有“yes”记录,那就可以发起超时终止协议。

二阶段提交的缺点:
主要是同步阻塞、单点问题、数据不一致,过于保守导致资源浪费
同步阻塞:二阶段提交协议存在的最明显也是最大的一个问题就是同步阻塞,在二阶段提交的执行过程中,所有参与该事务操作的逻辑都处于阻塞状态,也就是说,各个参与者在等待其他参与者响应的过程中,将无法进行其他任何操作, 这会极大的影响分布式系统的性能。
单点问题:如果协调者出现问题,整个流程将无法运转。
数据不一致:在正确记录日志的情况下,最终数据会一致,但是中间可能会有很长一段时间处于不一致的情况,比如某个参与者宕机。

五、三阶段提交

三阶段提交是二阶段提交的改进版,就是将二阶段提交的准备阶段和提交阶段更进一步划分,划分为三个阶段,也就是询问阶段(canCommit)准备阶段(preCommit)和提交阶段(commit)。

二阶段提交中,当协调者发起询问之后,参与者会先执行事务,然后响应是否可以提交,这是同一个过程。但假如某一个参与者因为某些原因一开始就知道执行会失败,那这个时候其它参与者执行事务其实是没有意义的,如果执行了还会陷入阻塞,要等待协调者发出的回滚命令。所以三阶段提交分为三个阶段,第一个阶段协调者进行询问,参与者先不执行事务,而是先回应自己能不能执行事务,如果都回答可以,协调者再发出命令执行事务,然后协调者等待参与者回应能否提交事务,如果所有参与者都回答可以提交事务,协调者再做出提交事务的决定。

三阶段提交主要减缓了二阶段提交阻塞的问题,但是并没有完全解决,也没有解决其它可能存在的问题,所以大部分分布式系统任然使用的是二阶段提交。

六、Paxos算法:

回到Paxos,Raft、ZAB协议所面对的场景中来,应对这种场景的一致性算法需要保证如下几点:
1、所有提案只有一个被选定
2、如果某个进程认为某个提案被选定了,那么这提案必须是真的被选定的那个。

Paxos算法中有三种角色:Proposer、Acceptor、和Learner。

七、Zookeeper

zookeeper是一个典型的分布式数据一致性解决方案,它可以保证如下分布式一致性特性:
1、顺序一致性:从同一个客户端发起的事务请求,最终将会严格的按照其发起的顺序被应用到zookeeper中去。
2、原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说要么整个集群中所有机器都成功应用了某一个事务,要么都没有应用。
3、单一视图:无论客户端连接的是哪个zookeeper服务器,其看到的服务端数据模型都是一致的。
4、可靠性:一旦服务端成功应用了一个事务,并对客户端做出了响应,那么这个事务所引起的状态变更将会一直被保留,除非另一个事务对其做了变更。

ZooKeeper 致力于提供一个高性能、高可用,且具有严格的顺序访问控制能力(主要是写操作的严格顺序性)的分布式协调服务。高性能使得ZooKeeper能够应用于那些对系统吞吐有明确要求的大型分布式系统中,高可用使得分布式的单点问题得到了很好的解决,而严格的顺序访问控制使得客户端能够基于Zoo0Keeper实现一些复杂的同步原语。

7.1 ZAB协议:

全程:崩溃恢复的原子广播协议,从名字也可以看出来这个协议主要定义了两种情况的处理流程,一是崩溃之后的恢复模式,另一个是正常工作时的消息广播模式。

7.1.1 恢复模式

恢复模式又分为领导者选举阶段和数据同步阶段,顾名思义,先要选举出领导者,选举的时候会选出当前ZXID最大的节点来作为新的领导者(不一定是整个集群最大)。选举出来之后要保证集群状态一致,需要进行同步,同步过程要保证新的Leader节点上已经提交的事务在follower节点也都提交,然后也要丢弃哪些只在原有Leader上提交,但是还未发送提交请求给Follower的事务。
崩溃恢复之快速领导者选举流程:
Zookeeper的选举和人类的选举逻辑类似,Zookeeper需要实现上面人类选举的四个基本概念;
1. 个人能力:Zookeeper是一个数据库,集群中节点的数据越新就代表此节点能力越强,而在Zookeeper中可以通事务id(zxid)来表示数据的新旧,一个节点最新的zxid越大则该节点的数据越新。所以Zookeeper选举时会根据zxid的大小来作为投票的基本规则。
2. 改票:Zookeeper集群中的某一个节点在开始进行选举时,首先认为自己的数据是最新的,会先投自己一票,并且把这张选票发送给其他服务器,这张选票里包含了两个重要信息:zxid和sid,sid表示这张选票投的服务器id,zxid表示这张选票投的服务器上最大的事务id,同时也会接收到其他服务器的选票,接收到其他服务器的选票后,可以根据选票信息中的zxid来与自己当前所投的服务器上的最大zxid来进行比较,如果其他服务器的选票中的zxid较大,则表示自己当前所投的机器数据没有接收到的选票所投的服务器上的数据新,所以本节点需要改票,改成投给和刚刚接收到的选票一样。
3. 投票箱:Zookeeper集群中会有很多节点,和人类选举不一样,Zookeeper集群并不会单独去维护一个投票箱应用,而是在每个节点内存里利用一个数组来作为投票箱。每个节点里都有一个投票箱,节点会将自己的选票以及从其他服务器接收到的选票放在这个投票箱中。因为集群节点是相互交互的,并且选票的PK规则是一致的,所以每个节点里的这个投票箱所存储的选票都会是一样的,这样也可以达到公用一个投票箱的目的。
4. 领导者:Zookeeper集群中的每个节点,开始进行领导选举后,会不断的接收其他节点的选票,然后进行选票PK,将自己的选票修改为投给数据最新的节点,这样就保证了,每个节点自己的选票代表的都是自己暂时所认为的数据最新的节点,再因为其他服务器的选票都会存储在投票箱内,所以可以根据投票箱里去统计是否有超过一半的选票和自己选择的是同一个节点,都认为这个节点的数据最新,一旦整个集群里超过一半的节点都认为某一个节点上的数据最新,则该节点就是领导者。

崩溃恢复之数据同步:
当选举出新的Leader之后就需要进行数据同步,Leader节点会为每一个Follower节点都准备一个队列,并将那些没有被各Follower节点同步的事务以Proposal消息的形式逐个发送给Follower节点,在每一个Proposal消息后面还会紧接着发送一个Commit消息,以表示该事务已经被提交。等到Follower节点将所有其尚未同步的事务都从Leader服务器上同步过来并成功应用到本地数据库中后,整个恢复模式就结束了,集群进入原子广播模式,对外开始提供服务。

同步过程中如何丢弃哪些应该丢弃的事务?
zookeeper的事务id(ZXID)是一个64位的数字,低32位是一个单调递增的计数器,每形成一个新的事务就会加1,而高32位则是代表了Leader的更新换代,也就是纪元,每选举出一个新的Leader就会进入一个新的纪元,高32位就会加1。假如有一个Leader它有一个请求自己已经提交了,但是还没有发送提交请求给Follower,自己就挂掉了,这个时候集群进入选举,选举出了新的Leader,新的Leader有最新的纪元,这个时候旧的Leader恢复了,它的纪元要小于新Leader,所以它无法再次称为Leader,同时新Leader会要求这个原来的leader回退那个它已经提交的事务,并同步到当前leader最新的事务。

详细同步过程如下:
1、统一纪元
Learner节点向Leader发送LearnerInfo数据(包含了acceptEpoch,即纪元),然后等待Leader响应。Leader不停的从Learner节点接收到发送过来的LearnerInfo数据,比较Epoch,超过过半机制后统一epoch纪元。
Leader统一Epoch后,向Learner节点,发送LeaderInfo数据(包含了新的epoch),等待接收AckEpoch数据。
Learner节点接收到LeaderInfo数据后,修改自己的epoch,然后发送AckEpoch数据给Leader。
2、进行同步
Learner节点发送AckEpoch之后就会进入阻塞,等待Leader节点发送数据过来进行同步。当Leader节点接收到了大部分的AckEpoch数据后,就开始发送同步数据
Leader节点首先整理要同步的数据,把这些数据先添加到queuedPackets队列中去,并且往队列中添加了一个NewLeader数据,然后Leader节点开启一个线程,从queuedPackets队列中获取数据发送
Learner节点接收数据后进行同步,同步完之后,会接收到一个NewLeader数据,然后给Leader返回一个ACK数据
Leader节点接收到了超过一半的ack后,则运行一个while,负责从Learner接收命令
10.Leader节点启动,向外提供服务
11.Follower节点启动,向外提供服务

7.1.2 消息广播模式:

Zookeeper的消息广播过程类似于一个二阶段提交过程,客户端发送的事务请求(写请求),Leader服务器会先生成一个对应的事务,然后发送给和Leader保持心跳的Follower节点,然后等待Follower的回复,但是和二阶段提交不一样的地方是这里只需要等待超过一半的Followe回复就可以提交了,并需要等待全部。并且也不会有中断事务的逻辑,如果失败就抛弃Leader服务器,如果大多数都失败就会进入崩溃恢复模式。

广播过程中保证事务的顺序性:
在整个消息广播过程中,Leader服务器收到一个写请求后,会为这个写请求生成对应的事务Proposal和生成一个全局单调递增的事务ID(即ZXID),并且会把事务写入自己的事务日志,然后开始进行事务广播,在广播过程中,Leader服务器会为每一个Follower服务器都各自分配一个单独的队列,然后将需要广播的事务依次放入到这些队列中去,并且按照先入先出(FIFO)策略进行消息发送。每一个Folower服务器在接收到这个事务之后,都会首先将其写入自己的事务日志之中,在成功写入后反馈给Leader服务器一个Ack响应。当 Leader服务器接收到超过半数 Follower的Ack响应后,Leader自身先写日志,然后自己提交事务并响应客户端,然后广播一个Commit消息给所有的Follower服务器来通知他们进行事务提交,每一个Follower服务器在接收到Commit消息后,也会完成对事务的提交。


详细的消息广播流程:
1. Leader节点,针对当前请求生成日志(Txn)
2. Leader节点,持久化前请求生成日志(Txn),并向自己发送一个Ack
3. Leader节点,把当前请求生成的日志(Txn)发送给其他所有的参与者节点(非Observer)
4. Leader节点,阻塞等待Follower节点发送Ack过来(超过一半则解阻塞)
5. Follower节点,接收到Leader节点发送过来的Txn
6. Follower节点,持久化当前Txn,并向Leader节点发送一个Ack
7. Leader节点,接收到了超过一半的Ack(加上自己发给自己的Ack),则解阻塞
8. Leader节点,向Follower节点发送commit命令(异步发送的,不会阻塞Leader节点)
9. Leader节点,执行Txn,更新内存(根据Txn更新DataBase)
10. Follower节点,接收到Leader节点发送过来的commit命令
11. Follower节点,执行Txn,更新内存(根据Txn更新DataBase)

7.2 Zookeeper数据模型

zookeeper的数据模型和文件系统特别相似,是一个树状结构,和文件系统不一样的是Zookeeper的整个树状结构都是在内存中的,数状结构的每个层级叫做一个ZNode。

节点类型:

节点的version:
ZooKeeper中为数据节点引入了版本的概念,每个数据节点都具有三种类型的版本信息,
version别是当前数据节点内容的版本号,cversion代表子节点的版本号,aversion代表ACL变更的版本号。这个版本号的概念和传统的版本号概念不太一样,它表示的是修改次数,比如version代表当前节点数据的修改次数,cversion表示对子节点的修改次数,aversion表示ACL的变更次数。

7.3 watcher机制——数据变更的通知

ZooKeeper的Watcher机制主要包括客户端线程、客户端WatchManager和ZooKeeper服务器三部分。在具体工作流程上,简单地讲,客户端在向ZooKeeper服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatchManager中。当ZooKeeper服务器端触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象来执行回调逻辑。

zookeeper对事件的封装极其简单,只携带少量的信息,主要还是起到通知作用,客户端监听到事件之后,才会执行程序员定义好的回调逻辑。

watcher类型
老版本都是一次性watcher,新版本增加了持久性的watcher和持久性递归的watcher,通过addWatcher()方法添加,可以指定AddWatchMode。
一次性watcher
持久性watcher
持久性递归watcher

watcher的事件类型:
事件类型:(znode节点相关的)
EventType:NodeCreated //节点创建
EventType:NodeDataChanged //节点的数据变更
EventType:NodeChildrentChanged //子节点下的数据变更
EventType:NodeDeleted //节点删除

状态类型:(是跟客户端实例相关的)
KeeperState:Disconneced //连接失败
KeeperState:SyncConnected //连接成功
KeeperState:AuthFailed //认证失败
KeeperState:Expired

watcher特点:
一次性(老版本):如果不使用持久性的watcher,其余Watcher都是一次性的,一旦被触发就会移除,再次使用时需要重新注册
客户端串行执行:客户端Watcher回调的过程是一个串行同步的过程(FIFO队列),这为我们保证了顺序
轻量:watchedEvent只包含三部分内容,通知状态,事件类型和节点路径,非常轻量

7.4 ACL保障数据的安全

ACL即访问控制列表,是一种权限模式。zookeeper中ACL机制,主要包括三方面的信息:权限模式(schema)授权对象(ID)和权限(permission),通常使用"schema:id:permission"来标识一个有效的ACL信息。

权限模式(schema):权限模式用来确定权限验证过程中使用的检验策略。包括IP、Digest(即用户名密码的权限形式)、World(即没有权限控制,或者任何人都有权限)、Supper(即超级用户)共四种权限模式。
授权对象ID:指的是权限赋予的用户或一个指定的实体,例如IP地址或者机器等等。
权限:具体的权限,有Create、Delete、Read、Write、Admin五种权限。

还可以自己扩展权限,实现自定义的权限控制器。

7.5 zookeeper中的通信

序列化与反序列化:
zookeeper使用Jute这个序列化组件来进行序列化和反序列化的操作。

通信协议:
zookeeper使用nio来进行网络通信,它实现了自己的通信协议,也就是规定了zookeeper通信时的请求格式和响应格式。

请求头中有一个xid,用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序。

7.6 zookeeper客户端

zookeeper客户端由zookeeper实例、ClientWatchManager、HostProvider、ClienCnxn这四个核心组件构成。zookeeper实例是客户端的入口,ClientWatchManager是客户端的watcher管理器,HostProvider是服务器地址列表管理器,ClientCnxn则代表了一个客户端和服务端的连接,其内部包含两个重要的线程,SendThread和EventThread,SendThread用于发送请求给服务器,并接收服务器的响应,而EventThread是一个事件线程,用于处理服务端的事件。

客户端启动过程:
设置默认Watcher(如果有的话)
设置Zookeeper服务器地址列表
创建ClientCnxn

一次会话的创建过程:
下面1到5为初始化阶段,从6开始进入到会话创建阶段,从11开始进入到响应处理阶段:

  1. 初始化 ZooKeeper对象。

    通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象,在初始化过程中,会创建一个客户端的Watcher管理器∶ClientwatchManager。
  2. 设置会话默认Watcher。

    如果在构造方法中传入了一个 Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientwatchManager中。

  3. 构造ZooKeeper服务器地址列表管理器∶HostProvider。

    对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。

  4. 创建并初始化客户端网络连接器∶ClientCnxn。
ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。ClientCnxn连接器的底层IO处理器是ClientCnxnSocket,因此在这一步中,客户端还会同时创建CLientCnxnSocket处理器。
  5. 初始化 SendThread和EventThread。

    客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化 EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。
  6. 启动
    SendThread和 EventThread
SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送"会话创建"请求做准备。
  7. 获取一个服务器地址。

    在开始创建TCP连接之前,SendThread首先需要获取一个Zookeeper服务器的目标地址,这通常是从 HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接。
  8. 创建 TCP连接。
    
获取到一个服务器地址后,CLientCnxnSocket负责和服务器创建一个TCP长连接。
  9. 构造ConectRequest请求。

    步骤8只是纯粹地从网络TCP层面完成了客户端与服务端之间的Socket连接,但还未完成ZooKeeper客户端的会话创建。SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest 请求,该请求代表了客户端试图与服务器创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络I/O层的Packet 对象,放入请求发送队列outgoingQueue中去。
  10. 向服务端发送ConectRequest请求。
    ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。
  11. 接收服务端响应。

    ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是"已初始化",如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
  12. 处理Response。
    ClientCnxnSocket会对接收到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到 ZooKeeper服务端分配的会话sessionId。

  13. 连接成功。

    连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括 readTimeout和connectTimeout等,并更新客户端状态∶另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。
  14. 生成事件∶SyncConnected-None。

    为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread 线程。
  15. 查询Watcher。
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2中存储的默认Watcher,然后将其放到 EventThread的waitingEvents 队列中去。
  16. 处理事件
EventThread
    不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发 Watcher的目的。


获取可用的服务器地址:
zookeeper客户端会把服务器地址随机打散,然后拼装成一个环形链表,然后如果当前服务器不可用了,就去尝试下一个连接。

请求发送
在正常情况下(即客户端与服务端之间的TCP连接正常且会话有效的情况下),会从outgoingQueue队列中提取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet请求头中去,然后将其序列化后进行发送。在outgoingQueue队列中的Packet整体上是按照先进先出的顺序被处理的,但是如果检测到客户端与服务端之间正在处理 SASL权限的话,那么那些不含请求头(requestHeader)的Packet(例如会话创建请求)是可以被发送的,其余的都无法被发送。
请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。

SendThread与EventThread
SendThread的作用:a、维持和服务器的心跳,如果断开了,还会进行重连操作。b、管理所有客户端发出去的请求以及接收服务端的响应,将事件放入到waitingEvents队列中去。
EventThread的作用:负责从waitingEvents中取出事件并进行处理,主要是触发客户端注册的事件监听,执行回调函数中的逻辑。

7.7 会话

在zookeeper客户端和服务端成功完成连接创建之后就建立了一个会话。
ZooKeeper会话在整个运行期间的生命周期中,会在不同的会话状态之间进行切换,这些状态一般可以分为CONNECTING、CONNECTED、RECONNECTING、RECONNECTED和CLOSE等。

SessionID的生成:
第一次生成的时候会根据机器的当前时间和机器的SID来生成,高8位代表了机器id,后56位是当前时间的毫秒进行的随机。以后的SessionID将每次增加1。

7.8 zookeeper应用场景

7.8.1 数据发布订阅(配置中心)

当用zookeeper来作为发布订阅中心的时候,首先把数据写入zookeeper的一个节点,然后每台机器从这个节点获取初始化配置并启动,然后监听这个节点的数据更改事件,一旦该节点的数据发生变更,那么zookeeper就会向相应的客户端发送Watcher事件通知,监听的服务接收到这个消息通知之后,需要主动到zookeeper获取最新的数据。可以看到基于Zookeeper的配置中心是一种推拉相结合的方式。

缺点:
1、节点数据大小的限制:对于Zookeeper来说,数据大小直接影响了它的读写性能,如果单个节点数据越大,对网络方面的吞吐就会造成影响,而zookeeper也添加了单个节点数据内容不能超过1M的限制。如果配置数据太大就只能用多个节点来存储,这样做增加了编程和维护的复杂性。
2、watcher机制:选择使用zookeeper做配置中心关键的一点就是watcher机制,但是watcher机制有一些要注意的点,一个是节点数据的版本变化会触发NodeDataChanged,注意,这里特意说明了是版本变化。存在这样的情况,只要成功执行了setData()方法,无论内容是否和之前一致,都会触发NodeDataChanged。另外最主要一点就是zookeeper并不保证每次节点的变化都会通知到客户端,原因是因为当一次数据修改,通知客户端,客户端再次注册watch,在这个过程中,可能数据已经发生了许多次数据修改。

7.8.2 负载均衡

使用Zookeeper实现负载均衡,首先每个服务将自己的信息注册到zk上,实际上就是在一个指定的持久化节点中创建一个临时子节点,这个临时子节点中记录自己的信息。客户端监听持久化节点下的子节点事件,每次事件都从zk持久化节点上获取所有的临时节点,也就是最新服务节点信息,然后在本地使用负载均衡算法,随机分配服务器。本地的负载均衡策略包括随机选择,构造成环轮询,或者按请求次数模上服务节点数量等等。

缺点:一是要在客户端实现自己的负载均衡算法。而是由于是临时节点,有可能出现闪断,这样客户端获取到最新服务节点可能并不是所有正常工作的服务节点,对负载均衡策略也有一些影响。可以用最新的服务器节点或者时间来解决这个问题。

kafka使用zookeeper来实现负载均衡。

7.8.3 命名服务

比如实现一个分布式的全局唯一递增ID,我们知道zk中有顺序节点,顺序节点的名字中就有它的序号,这个序号就是就可以作为分布式的全局唯一递增id。

顺序节点原理:在ZooKeeper中,每个父节点都会为它的第一级子节点维护一份顺序,用于记录下每个子节点创建的先后顺序。基于这个顺序特性,在创建子节点的时候,可以设置这个标记,那么在创建节点过程中,Zookeeper会自动为给定节点名加上一个数字后缀,作为一个新的、完整的节点名。另外需要注意的是,这个数字后缀的上限是整型的最大值。


7.8.4 分布式协调/通知

比如canal就是使用zookeeper来做分布式的协调

心跳检测:基于zookeeper来做心跳检测就比较容易了,创建一个临时节点,不同服务可以通过这个临时节点来判断对应的服务是不是还存在。当然也可能出现闪断,解决思路是监听到一个临时节点的删除操作之后设置一个延迟时间,如果过了一段时间临时节点依然没有起来再当作心跳失败。或者注册的时候使用服务器节点或者时间节点来解决这个问题。

工作汇报:在一个常见的任务分发系统中,通常任务被分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。这个时候就可以通过ZooKeeper来实现。在ZooKeeper上选择一个节点,每个任务客户端都在这个节点下面创建临时子节点,这样便可以实现两个功能∶
通过判断临时节点是否存在来确定任务机器是否存活;
各个任务机器会实时地将自己的任务执行进度写到这个临时节点上去,以便中心系统能够实时地获取到任务的执行进度。

系统调度:使用ZooKeeper,能够实现另一种系统调度模式∶一个分布式系统由控制台和一些客户端系统两部分组成,控制台的职责就是需要将一些指令信息发送给所有的客户端,以控制它们进行相应的业务逻辑。后台管理人员在控制台上做的一些操作,实际上就是修改了ZooKeeper上某些节点的数据,而ZooKeeper进一步把这些数据变更以事件通知的形式发送给了对应的订阅客户端。

7.8.5 集群管理

所谓集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制。在日常开发和运维过程中,我们经常会有类似于如下的需求。

分布式日志手机系统:
在一个典型的日志系统的架构设计中,整个日志系统会把所有需要收集的日志机器分为多个组别,每个组别对应一个收集器,这个收集器其实就是一个后台机器,用于收集日志。对于大规模的分布式日志收集系统场景,通常需要解决如下两个问题。
变化的日志源
机器
在生产环境中,伴随着机器的变动,每个应用的机器几乎每天都是在变化的(机器硬件问题、扩容、机房迁移或是网络问题等都会导致一个应用的机器变化),也就是说每个组别中的日志源机器通常是在不断变化的。
变化的收集器
机器
日志收集系统自身也会有机器的变更或扩容,于是会出现新的收集器机器加入或是老的收集器机器退出的情况。

上面两个问题,无论是日志源机器还是收集器机器的变更,最终都归结为一点∶如何快速、合理、动态地为每个收集器分配对应的日志源机器。

使用ZooKeeper来进行日志系统收集器的注册,典型做法是在ZooKeeper上创建一个节点作为收集器的根节点,每个收集器机器在启动的时候,都会在收集器节点下创建自己的节点。待所有收集器机器都创建好自己对应的节点后,系统根据收集机器的个数,将所有日志源机器分成对应的若干组,然后将分组后的机器列表分别写到这些收集器机器创建的子节点上去。这样一来,每个收集器机器都能够从自己对应的收集器节点上获取日志源机器列表,进而开始进行日志收集工作。完成收集器机器的注册以及任务分发后,我们还要考虑到这些机器随时都有挂掉的可能。因此,针对这个问题,我们需要有一个收集器的状态汇报机制∶每个收集器机器在创建完自己的专属节点后,还需要自己的节点下面创建一个状态子节点,每个收集器机器都需要定期向该节点写入自己的状态信息。我们可以把这种策略看作是一种心跳检测机制,通常收集器机器都会在这个节点中写入日志收集进度信息。日志系统根据该状态子节点的最后更新时间来判断对应的收集器机器是否存活。


如果收集器机器挂掉或是扩容了,就需要动态地进行收集任务的分配。在运行过程中,日志系统始终关注着收集器根节点下所有子节点的变更,一旦检测到有收集器机器停止汇报或是有新的收集器机器加入(这里不要用监听,因为可能会有大量的事件,所以采用定期轮询的方式),就要开始进行任务的重新分配。无论是针对收集器机器停止汇报还是新机器加入的情况,日志系统都需要将之前分配给该收集器的所有任务进行转移。为了解决这个问题,通常有两种做法:全局动态分配和局部动态分配,全局动态分配,就是把所有要收集的任务重新均分,简单粗暴但是印象范围太广。局部动态分配是指每个收集器汇报状态的时候还汇报自己的负载,如果一个收集器挂了,就把挂了的这个节点的收集任务交给其它负载低的收集机器,如果新加入一个收集机器就把负载高的机器的一部分任务交给新的机器去做。

7.8.6 Master选举

先创建一个持久节点,然后在持久节点中创建同名的临时节点,谁创建成功谁就是master,其它机器监听这个临时节点的删除事件。
masterr选举如何应对脑裂问题,脑裂问题是指某个节点本来已经是master了,但是因为GC时间过长,或者网络原因导致与ZK的连接中断,这个时候临时节点会被删除掉,就会有其它节点被选为master,然后当前节点恢复了,它依然认为自己是master,即产生了脑裂。比如Hdfs的namenode是怎么解决脑裂问题的,namenode创建临时节点成功了,这个namenode就变为active的状态,另外一个就会变为standby的状态,称为active的namenode还会创建一个临时节点,记录自己的信息,如果这个namenode正常结束会去删掉这个持久节点,但是如果是异常退出的,这个持久节点就不会被删掉,但是临时节点会被删掉,这个时候standby的节点就会创建临时节点,准备变为active状态,但是它发现持久化节点还在,就会先尝试通知另外一个namenode,让它进入standby状态,如果失败,默认就会调用一个脚本杀死原来的namenode进程,让它重启,或者如果用户自定义了隔离脚本也可以调用自定义的隔离脚本。

7.8.7 分布式锁

独占锁:先创建一个持久化节点作为锁的根节点,然后如果某个服务需要获取锁就去这个根节点下创建一个临时的顺序节点,同时判断自己是不是所有节点中序号最小的那一个,如果是就代表自己获取到锁了,开始执行业务逻辑,如果不是就监听自己的前一个节点的删除事件。如果前一个节点被删除,监听的这个节点将获取到通知,它要再次获取所有子节点,然后看自己是不是子节点中最小的,如果是就代表获取到锁了,如果不是说明前一个节点异常删除了(可能是服务挂了),这个时候它还得继续监听前一个节点。

如何实现可重入性,客户端本地实现可重入就可以了,本地记录重入次数,重入次数减到0就可以去zk中删掉临时节点。

共享锁:共享锁的实现稍微复杂一点,同样是先创建一个持久化节点作为锁的根节点,然后创建临时节点来作为读写锁,用节点名字来区分读写锁,比如读锁就是R_序号,写锁是W_序号。尝试获取锁的时候也是先创建对应类型临时顺序子节点,然后获取到所有节点,并进行排序,如果自己要获取一个读锁,然后发现自己前面也都是读锁,那就直接获取锁,如果发现前面有写锁,就监听这个写锁的删除事件,如果被删除收到通知再次检测前面是否有写锁,如果没有了就获取到了锁,开始执行,如果还有,就继续监听前一个写锁的删除事件。如果是一个写锁,只要自己不是最小的节点就没有获取锁,然后监听前一个节点,前一个节点被移除,收到通知之后再判断自己是否最小,是的化就获取锁。可重入性依然在本地实现就可以了。

分布式队列:

八、Canal

8.1 基础原理

canal的工作原理就是模仿Mysql Slave的交互协议,将自己伪装成一个Mysql的Slave节点,然后不断向Mysql的master节点发送Dump请求,Master收到dump请求之后,开始就会推送相应的binlog的数据给这个slave节点,也就是canal,canal收到相应的binlog数据之后解析出来就可以进行消费了。

canal server的执行单位叫instance,一个instance定义了需要同步哪个mysql数据库的哪些表,以及从什么位置开始同步,instance互相之间彼此隔离。canal server本身是多活的,可以安装多个canal server,但是instance只会运行在一个canal server里面,为了保证高可用性,比如一台canal server挂掉了,这台canal server上运行的instance就会自动切换到另外某一台canal server的节点上。这个切换的过程就是利用zookeeper来控制的。

8.2 instance抢占式执行的具体实现:

每个canal server启动之后都会去扫描instance的配置,如果发现了一个instence,会先去zookeeper中创建这个instance的节点,这个是一个持久化节点,这个节点里面还会记录同步的位点信息,另外会创建一个临时节点,里面会记录创建这个临时节点的canal server的ip和端口号,如果有多个canal server扫描到了同一个instance,谁先创建出临时节点并记录自己的信息,谁就执行这个instance,所以这个临时节点记录的就是正在运行这个instance的canal server。而其它的canal server发现临时节点已经创建出来了,并且上面记录的信息不是自己的,就会注册一个watcher,监听这个临时节点的删除事件,然后在这个canal server上的instance会陷入阻塞状态,直到监听到临时节点被删除,会再次尝试去创建临时节点,如果创建上了,自己就读取zookeeper上记录的这个instance的位点信息,并从这个位点继续往后解析。

8.3 这样设计存在的问题

可以看到这个设计有点类似于分布式锁,但是又不太一样,它不保证公平性,也没有重入的概念。这样设计可以保证一个instance总是只有一个canal server在运行,而且一旦崩溃可以由另外一个canal server来运行。但也有问题,实际上instance的运行是抢占式的,谁先抢占到,谁就会执行,可能部署了n个canal server,但是所有instance都第一个canal server给抢占到了。解决思路有两种,一是随机canal server的扫描时间,这样可以让每个canal server都有可能抢占到instance,但是这个只能减缓不均匀的问题,不能根除。另外一种思路就是抛弃这种抢占式的思路,进行任务的分发或者负载均衡,但是要处理canal server崩溃后的instance再次分发的情况,可能需要引入一个协调者,引入协调者有可能会出现单点问题。

8.4 canal如何解决闪断问题

闪断是指一个canal server已经开始正常运行某个instance了,但是和zookeeper发生了短时间连接中断,这个时候zookeeper会删除掉临时节点,如果没有任何处理,其它canal server会直接抢占这个instance,并开始执行,这个时候当前server重新连接上了zookeeper,结果发现临时节点记录的信息不是自己的了,它就必须停止自己正在运行的instance。如和解决这个问题呢?可以让canal server监听到删除事件之后,延迟一会儿再去抢占instance,而原先正在运行的这个canal server不会延迟,这样如果正在运行的这个canal server在延迟时间内重新连上了zookeeper,将继续占有锁,继续执行。其它canal server经过延迟时间之后,会去尝试创建临时节点,如果失败就会继续阻塞,并且重新注册监听事件。

8.5 canal client

canal client会连接canal server,并且从canal server获取数据,但是并不用显示的向canal client配置canal server的地址,只要canal client和canal server使用同一个zookeeper就可以了,canal client会去配置的instance节点下面获取到正在执行的这个instance的ip和端口号,然后进行连接,并且会注册临时节点的删除事件,这样当instance发生切换之后,client也能够及时感知到切换,然后重新连接切换之后的canal server。

canal client也可以实现自己的HA,实现原理就和canal server一样。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注