@yanglfyangl
2018-05-22T10:09:21.000000Z
字数 4973
阅读 535
void handle(){ServiceA.Methond1(para1, para2, ...);ServiceB.Methond1(para1, para2, ...);String key = ServiceC.Methond(para1, para2, ...);if(key){ServiceA.Methond2(para1, para2, ...);} else {ServiceB.Methond2(para1, para2, ...);}int case = ServiceD.Methond(para1, para2, ...);switch(case):{ECase1:ServiceE.Methond2(para1, para2, ...);ECase1:ServiceF.Methond2(para1, para2, ...);ECase1:ServiceG.Methond2(para1, para2, ...);ECase1:ServiceH.Methond2(para1, para2, ...);default:log("..");}}
- 所有接口最好是幂等的,虽然我们的逻辑中可以不需要,但重要功能函数幂等是推荐的。
- 不必要的接口不必支持事务,每个事务都会占用内存或缓存,所以比较简单的无状态的接口,不要加缓存。
- TCC(强一致性使用)
- 基于外部事件表的"可靠事件模式" (周期长,时间要求不高但一致性要求高时使用)
- 基于技术异常的“补偿模式”(最大重试+恢复重试+人工异常单处理)
对于TCC,我们可以使用开源的tcc-transaction或tx-LCN( 目前使用的公司还是有点少 )来实现,所以我们的系统的重点就是实现另外两种
- 任何地方都可能出错,比如放如队列,但队列没有及时返回给调用方这样的错误。
- 大多数情况下调用之间是有上下文关系的,后续的操作是需要等待上次操作的完成的。
- 往往发现事务问题时,前期的代码都已成型,改动的成本很高,但同时压力还很大。
期望的代码逻辑变化,与其它的框架不同的是,并不是走注解的方式来创建,而是代码显示的创建,每个需要支持的方法里,都需要显示的传入dtsSession,如果不传或传空的话,将不会支持DTS
class DTSSession:{}class DTSClient:{}void test_aSyncDTS(){DTSSession dtsSession = new DTSSession();DTSClient client.push(dtsSession);boolean ret = client.run({ServiceA.Methond1(dtsSession, para1, para2, ...);ServiceB.Methond1(dtsSession, para1, para2, ...);String key = ServiceC.Methond(dtsSession, para1, para2, ...);if(key){ServiceA.Methond2(dtsSession, para1, para2, ...);} else {ServiceB.Methond2(dtsSession, para1, para2, ...);}});if(ret ) {// 只要返回成功,则可以返回用户成功了,系统会在未来一段时间内尽量保证// 这个操作被成功的执行完成。}}
调用的代码逻辑大概是这样的:
//////////////////////////////////void method() {//调用事务的主方法DTSClient client = XXX;client.run(dtsSession, {ServiceA.Methond1(dtsSession, para1, para2, ...);{//调用Methond1的时候的逻辑是1. 全局的filter会过滤所有dtsSession不为空的方法。2. 根据dtsSession来从缓存中查看是否有结果3. 如果有之前同样请求调用的结果,则从缓存中直接返回。4. 否则会调用相关的方法。5. 调用方法的返回值,存入dtsSession对应的缓存中这时候,为了防止插入多次等,所以返回值即使只是简单的正常,也会放入缓存中,只要是同一事务ID,则会返回相对的返回值。。}ServiceA.Methond1(dtsSession, para1, para2, ...);if(key){ServiceA.Methond2(dtsSession, para1, para2, ...);} else {ServiceB.Methond2(dtsSession, para1, para2, ...);}}, null);//null 表示暂时不提供rollback方法。当不提供rollback方法的时候,会进入异常单处理流程。return success; //针对可以保证最终一致性的方法。}//////////////////////////////////class DTSClient{List(dtsSession, function(), function()) callbackArray = List();boolean run(DTSSession dtsSession, function execFunc, function rollbackFunc) {//将function与DTS绑定后放入队列。callbackArray.push(Object<dtsSession, execFunc, rollbackFunc>);try{execFunc();} catch(Exception e){//打日志等等。。。}只要放到了队列中,就返回true。return true;}boolean handleDTSEvent(Event){//接收从Rocket队列中传来的消息,Function f = callbackArray.get(Event.SessionID());if(f!=null){f();}Ack(“消息处理完毕”);}}//用于普通Java代码的过滤。class DTSAspectJFilter:{@Before("execution(* **(DTSSession, ..))")public void onMethodCalledBefore(JoinPoint joinPoint) throws Throwable {//1. 判断方法中的DTSSession是否为空,如果为空,则直接调用方法体。//2. 从缓存里读取数据,如果读到,不调用方法,直接返回数据。//3. 如果取不到,则调用方法。if(DTSSession !=null ){if(DTSSession.hasData()){return DTSSession.Data()} else {调用对应的方法}}}@After("execution(*(DTSSession, ..))")public void onMethodCalledAfter(JoinPoint joinPoint) throws Throwable {//1. 方法调用完后,将数据写入本地缓存或Redis。(使用JetCache)//2. 缓存的key为 DTS_sessionID_class_method_parastring//3. 超时时间为65s,也就是说,事务的有效期为60秒,事务过期后,如果还没成功,// 则进入异常单处理流程。}}//用于Dubbo接口的过滤器,这样业务端的dubbo接口不需要做任何变化。class DTSDubboFilter:{// 与上面逻辑基本一致。// 但所有Dubbo调用的缓存,必须要写入全局缓存。}/**/class DTSSession:{private long sessionID;savaData(Object param, Object results){}Boolean hasData(Object param){}Object getData(Object param){}public void init(){//将SessionID写入Redis中的待处理事务区(1分钟过期)。//DTSServer需要定期将这部分待处理事务进行处理//这就需要用到Redis的“过期通知机制”}public long GetID(){};....}class DTSServer:{void handle(Event e){if(e.Timeout){// 当Redis中某个事件过期时调用。// 当读到某个事件过期时,将内容读出来,放入业务异常日志(日志还会单独处理)。// 因为内容所在区比信息所在区时间长,所以肯定可以取到。}if(e.Created) {// save info to DB, 主要是为了防止一旦redis出问题,数据还在。}if(e.Changed){if(e.Data().Status = false) {//表示执行失败了,这时候的逻辑是:1. 通过MQ发送消息.2. 对应的DTSClient得到消息后,进行重试。3. 重试的时间间隔等逻辑,由DTSClient控制sendEventToClient();}}}//这个如果要开源出去或做成公共组件供各种组来使用,可以做成plugin模式。void exceptionRepaire(){//通过读取业务异常日志文件进行继续分析switch(type) //对于不同的业务类型处理方式不同{case bz1:比如:1. 重试几次,如果全失败,写入异常单;2. 调用撤回操作,异步通知用户。3. 如果提供了异常回滚接口,则调用回滚接口。。。4. 。。。case bz2:default:...}}}//////////////////////////////////
如果需要强一致性:
@Translation(...);boolean handle() {//用数据库事务来保证回滚。ServiceA.Saved(...);if(!handle(...)){rollback();return false;}return true;}boolean handle(...){DTSSession dtsSession = new DTSSession();client.push(dtsSession);boolean ret = client.run({ServiceA.Methond1(dtsSession, para1, para2, ...);ServiceB.Methond1(dtsSession, para1, para2, ...);String key = ServiceC.Methond(dtsSession, para1, para2, ...);if(key){ServiceA.Methond2(dtsSession, para1, para2, ...);} else {ServiceB.Methond2(dtsSession, para1, para2, ...);}});return true;}
如果不太需要强一致性
boolean handle() {//用数据库事务来保证回滚。boolean ret = true;try{ServiceA.Saved(...);} catch {ret = false;return ret;}ret = client.run({ServiceA.Methond1(dtsSession, para1, para2, ...);........})return ret;}
- 各个接口不是必需要实现幂等性;当然,最好是实现;
- 如果有逻辑的话(if条件,switch,甚至多线程,actor模型。。。),当某个接口重试恢复后,还会继续原来的操作继续实现。代码逻辑不会因为逻辑过于复杂而改动非常大。
- 大量的使用JVM 内存和缓存,最好提前预估好容量。
- 对于rollback不太容易处理,如果需要rollback的,还是需要与TCC框架和本地事务配合,业务端需要控制好粒度。
- 一旦JVM在某个事务的执行中挂掉了,需要额外的业务补偿。
- 需要本地事务保证的地方,还是需要使用相关的本地事务框架。