[关闭]
@yanglfyangl 2018-05-22T10:09:21.000000Z 字数 4973 阅读 485

分布式事务

问题:当你的代码像下面一样,能做分布式事务吗?

  1. void handle(){
  2. ServiceA.Methond1(para1, para2, ...);
  3. ServiceB.Methond1(para1, para2, ...);
  4. String key = ServiceC.Methond(para1, para2, ...);
  5. if(key){
  6. ServiceA.Methond2(para1, para2, ...);
  7. } else {
  8. ServiceB.Methond2(para1, para2, ...);
  9. }
  10. int case = ServiceD.Methond(para1, para2, ...);
  11. switch(case):{
  12. ECase1:
  13. ServiceE.Methond2(para1, para2, ...);
  14. ECase1:
  15. ServiceF.Methond2(para1, para2, ...);
  16. ECase1:
  17. ServiceG.Methond2(para1, para2, ...);
  18. ECase1:
  19. ServiceH.Methond2(para1, para2, ...);
  20. default:
  21. log("..");
  22. }
  23. }

重要要求:

  • 所有接口最好是幂等的,虽然我们的逻辑中可以不需要,但重要功能函数幂等是推荐的。
  • 不必要的接口不必支持事务,每个事务都会占用内存或缓存,所以比较简单的无状态的接口,不要加缓存。

业界常见的支持的事务类型有:

  • TCC(强一致性使用)
  • 基于外部事件表的"可靠事件模式" (周期长,时间要求不高但一致性要求高时使用)
  • 基于技术异常的“补偿模式”(最大重试+恢复重试+人工异常单处理)

对于TCC,我们可以使用开源的tcc-transactiontx-LCN( 目前使用的公司还是有点少 )来实现,所以我们的系统的重点就是实现另外两种

事务的难点:

  • 任何地方都可能出错,比如放如队列,但队列没有及时返回给调用方这样的错误。
  • 大多数情况下调用之间是有上下文关系的,后续的操作是需要等待上次操作的完成的。
  • 往往发现事务问题时,前期的代码都已成型,改动的成本很高,但同时压力还很大。

基本流程

期望的代码逻辑变化,与其它的框架不同的是,并不是走注解的方式来创建,而是代码显示的创建,每个需要支持的方法里,都需要显示的传入dtsSession,如果不传或传空的话,将不会支持DTS

  1. class DTSSession:{
  2. }
  3. class DTSClient:{
  4. }
  5. void test_aSyncDTS(){
  6. DTSSession dtsSession = new DTSSession();
  7. DTSClient client.push(dtsSession);
  8. boolean ret = client.run({
  9. ServiceA.Methond1(dtsSession, para1, para2, ...);
  10. ServiceB.Methond1(dtsSession, para1, para2, ...);
  11. String key = ServiceC.Methond(dtsSession, para1, para2, ...);
  12. if(key){
  13. ServiceA.Methond2(dtsSession, para1, para2, ...);
  14. } else {
  15. ServiceB.Methond2(dtsSession, para1, para2, ...);
  16. }
  17. });
  18. if(ret ) {
  19. // 只要返回成功,则可以返回用户成功了,系统会在未来一段时间内尽量保证
  20. // 这个操作被成功的执行完成。
  21. }
  22. }

调用的代码逻辑大概是这样的:

  1. //////////////////////////////////
  2. void method() //调用事务的主方法
  3. DTSClient client = XXX;
  4. client.run(dtsSession, {
  5. ServiceA.Methond1(dtsSession, para1, para2, ...);
  6. {//调用Methond1的时候的逻辑是
  7. 1. 全局的filter会过滤所有dtsSession不为空的方法。
  8. 2. 根据dtsSession来从缓存中查看是否有结果
  9. 3. 如果有之前同样请求调用的结果,则从缓存中直接返回。
  10. 4. 否则会调用相关的方法。
  11. 5. 调用方法的返回值,存入dtsSession对应的缓存中
  12. 这时候,为了防止插入多次等,所以返回值即使只是简单的正常,也会放入缓存中
  13. ,只要是同一事务ID,则会返回相对的返回值。。
  14. }
  15. ServiceA.Methond1(dtsSession, para1, para2, ...);
  16. if(key){
  17. ServiceA.Methond2(dtsSession, para1, para2, ...);
  18. } else {
  19. ServiceB.Methond2(dtsSession, para1, para2, ...);
  20. }
  21. }, null);
  22. //null 表示暂时不提供rollback方法。当不提供rollback方法的时候,会进入异常单处理流程。
  23. return success; //针对可以保证最终一致性的方法。
  24. //////////////////////////////////
  25. class DTSClient{
  26. List(dtsSession, function(), function()) callbackArray = List();
  27. boolean run(DTSSession dtsSession, function execFunc, function rollbackFunc) {
  28. //将function与DTS绑定后放入队列。
  29. callbackArray.push(Object<dtsSession, execFunc, rollbackFunc>);
  30. try{
  31. execFunc();
  32. } catch(Exception e){
  33. //打日志等等。。。
  34. }
  35. 只要放到了队列中,就返回true
  36. return true;
  37. }
  38. boolean handleDTSEvent(Event){
  39. //接收从Rocket队列中传来的消息,
  40. Function f = callbackArray.get(Event.SessionID());
  41. if(f!=null){
  42. f();
  43. }
  44. Ack(“消息处理完毕”);
  45. }
  46. }
  47. //用于普通Java代码的过滤。
  48. class DTSAspectJFilter:{
  49. @Before("execution(* **(DTSSession, ..))")
  50. public void onMethodCalledBefore(JoinPoint joinPoint) throws Throwable {
  51. //1. 判断方法中的DTSSession是否为空,如果为空,则直接调用方法体。
  52. //2. 从缓存里读取数据,如果读到,不调用方法,直接返回数据。
  53. //3. 如果取不到,则调用方法。
  54. if(DTSSession !=null ){
  55. if(DTSSession.hasData()){
  56. return DTSSession.Data()
  57. } else {
  58. 调用对应的方法
  59. }
  60. }
  61. }
  62. @After("execution(*(DTSSession, ..))")
  63. public void onMethodCalledAfter(JoinPoint joinPoint) throws Throwable {
  64. //1. 方法调用完后,将数据写入本地缓存或Redis。(使用JetCache)
  65. //2. 缓存的key为 DTS_sessionID_class_method_parastring
  66. //3. 超时时间为65s,也就是说,事务的有效期为60秒,事务过期后,如果还没成功,
  67. // 则进入异常单处理流程。
  68. }
  69. }
  70. //用于Dubbo接口的过滤器,这样业务端的dubbo接口不需要做任何变化。
  71. class DTSDubboFilter:{
  72. // 与上面逻辑基本一致。
  73. // 但所有Dubbo调用的缓存,必须要写入全局缓存。
  74. }
  75. /*
  76. */
  77. class DTSSession:{
  78. private long sessionID;
  79. savaData(Object param, Object results){}
  80. Boolean hasData(Object param){}
  81. Object getData(Object param){}
  82. public void init(){
  83. //将SessionID写入Redis中的待处理事务区(1分钟过期)。
  84. //DTSServer需要定期将这部分待处理事务进行处理
  85. //这就需要用到Redis的“过期通知机制”
  86. }
  87. public long GetID(){};
  88. ....
  89. }
  90. class DTSServer:{
  91. void handle(Event e){
  92. if(e.Timeout){
  93. // 当Redis中某个事件过期时调用。
  94. // 当读到某个事件过期时,将内容读出来,放入业务异常日志(日志还会单独处理)。
  95. // 因为内容所在区比信息所在区时间长,所以肯定可以取到。
  96. }
  97. if(e.Created) {
  98. // save info to DB, 主要是为了防止一旦redis出问题,数据还在。
  99. }
  100. if(e.Changed){
  101. if(e.Data().Status = false) {
  102. //表示执行失败了,这时候的逻辑是:
  103. 1. 通过MQ发送消息.
  104. 2. 对应的DTSClient得到消息后,进行重试。
  105. 3. 重试的时间间隔等逻辑,由DTSClient控制
  106. sendEventToClient();
  107. }
  108. }
  109. }
  110. //这个如果要开源出去或做成公共组件供各种组来使用,可以做成plugin模式。
  111. void exceptionRepaire(){
  112. //通过读取业务异常日志文件进行继续分析
  113. switch(type) //对于不同的业务类型处理方式不同
  114. {
  115. case bz1:
  116. 比如:
  117. 1. 重试几次,如果全失败,写入异常单;
  118. 2. 调用撤回操作,异步通知用户。
  119. 3. 如果提供了异常回滚接口,则调用回滚接口。。。
  120. 4. 。。。
  121. case bz2:
  122. default:
  123. ...
  124. }
  125. }
  126. }
  127. //////////////////////////////////

建议的代码风格

如果需要强一致性:

  1. @Translation(...);
  2. boolean handle() {
  3. //用数据库事务来保证回滚。
  4. ServiceA.Saved(...);
  5. if(!handle(...)){
  6. rollback();
  7. return false;
  8. }
  9. return true;
  10. }
  11. boolean handle(...){
  12. DTSSession dtsSession = new DTSSession();
  13. client.push(dtsSession);
  14. boolean ret = client.run({
  15. ServiceA.Methond1(dtsSession, para1, para2, ...);
  16. ServiceB.Methond1(dtsSession, para1, para2, ...);
  17. String key = ServiceC.Methond(dtsSession, para1, para2, ...);
  18. if(key){
  19. ServiceA.Methond2(dtsSession, para1, para2, ...);
  20. } else {
  21. ServiceB.Methond2(dtsSession, para1, para2, ...);
  22. }
  23. });
  24. return true;
  25. }

如果不太需要强一致性

  1. boolean handle() {
  2. //用数据库事务来保证回滚。
  3. boolean ret = true;
  4. try{
  5. ServiceA.Saved(...);
  6. } catch {
  7. ret = false;
  8. return ret;
  9. }
  10. ret = client.run({
  11. ServiceA.Methond1(dtsSession, para1, para2, ...);
  12. ....
  13. ....
  14. })
  15. return ret;
  16. }

技术点:

优势

  • 各个接口不是必需要实现幂等性;当然,最好是实现;
  • 如果有逻辑的话(if条件,switch,甚至多线程,actor模型。。。),当某个接口重试恢复后,还会继续原来的操作继续实现。代码逻辑不会因为逻辑过于复杂而改动非常大。

劣势

  • 大量的使用JVM 内存和缓存,最好提前预估好容量。
  • 对于rollback不太容易处理,如果需要rollback的,还是需要与TCC框架和本地事务配合,业务端需要控制好粒度。
  • 一旦JVM在某个事务的执行中挂掉了,需要额外的业务补偿。

使用建议

  • 需要本地事务保证的地方,还是需要使用相关的本地事务框架。
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注