[关闭]
@yanglfyangl 2018-08-09T07:36:41.000000Z 字数 2177 阅读 454

数据一致性修复器(错误补偿)


【前言】

大型项目发展到一定阶段,往往为了1%的事做的工作,并不会是简单的1%,可能是多的多。
对于我们目前的项目,一致性的问题可能会体现在如下几个地方

  • Kafka的某个时刻不可用
  • Redis的某个时刻不可用
  • 与其它系统的配合过程中,某个时刻的不同步。(异构)

如果说涉及到的数据源,按照优先级,可以如下排列

  • 缓存不同步
  • 图库不同步
  • ES不同步
  • Mongo或Mysql不同步

【我们的解决方案】

Created with Raphaël 2.1.2用户用户服务服务数据库数据库队列队列日志日志计算中心计算中心缓存缓存修复引擎修复引擎发朋友圈入库如果写库失败,则:返回失败我们对写库失败,接失败处理成功则:插入队列如果写队列失败,则:写日志:{optype:"queue", id:"业务ID",data:"业务数据"}处理数据如果失败,则:{optype:"calccenter", id:"业务ID",data:"业务数据"}成功则写入缓存如果写缓存失败失败,则:{optype:"cache", id:"业务ID",data:"业务数据"}读取返回按类型进行处理,循环如果写队列失败,则:一旦队列修复,重新入队如果计算中心失败,则:重新入队如果写缓存失败,则:一旦缓存修复,重新入队

示例

  1. @nomedeng;
  2. void add(a, b){
  3. return a+b;
  4. }
  5. void publicSomthingB(){
  6. add();
  7. if(para1){
  8. if(doA(para1)){
  9. BService.save();
  10. BService.countIncrease(3)
  11. } else {
  12. BService.updated();
  13. BService.countIncrease(4)
  14. Log.Error("adfasdf");
  15. }
  16. }
  17. }
  18. public void publicSomthing(){
  19. if(para1){
  20. if(doA(para1)){
  21. BService.save();
  22. BService.countIncrease(2)
  23. if(error){
  24. Log.Error();
  25. }
  26. } else {
  27. BService.updated();
  28. BService.countIncrease(1)
  29. if(publicSomthingB()){
  30. Log.Error("failed", "asdfas");
  31. }
  32. }
  33. }
  34. ...
  35. ...
  36. ...
  37. }
  38. Log.Fatel("", "补偿API");

public void addForError(String userId,Integer num){
Integer moneyResult= userService.addMoney(userId,num);
}

如果方法抛出异常,则根据@Idempotent来进行处理。
@buchang()
public void SerC::add(String userId,Integer num){
Integer moneyResult= userService.addMoney(userId,num);
if(moneyResult%2==1){
Log.Fatal("", isRetry = true); //如果没有定义,则是当前函数
return success;
}
Integer sorceResult=userService.addSorce(userId,num);
if(sorceResult%2==0){
Log.Fatal("", "addForError", parm1, parm2);
}
}

public void SerC::addB(String userId,Integer num){
userService.addMoney(userId,num);
if(moneyResult%2==1){
Log.Fatal("");

}


throws Exception

}

int test1(){
alwaysSuccess(SerC->add());
return success;
}

  1. 如果有@Idempotent,则调用端会假设调用肯定成功,不会异常。只是成功的时机可能会是最终一致的。
  2. 如果没有,则调用端需要自己处理是否需要重试。
  3. 如果代码中有Log.Fatal("", isRetry = true),则可函数内返回成功,错误也会重试。

后面补偿怎么做

大概的语句:

  1. fromMysql("dbname").
  2. get("sql语句", resultsA).
  3. fromMongo("dbname").
  4. get("mongo语句", resultsB).
  5. calc().
  6. filter(resultsA, "去掉的item").
  7. filter(resultsA, "去掉的item").
  8. merge(resultsA, resultsB).
  9. intersection(resultsA, resultsB).
  10. then().
  11. putToRedis().
  12. asZSet(“语句”, [resultsA.id, resultsA.scroe]).
  13. asSet(“语句”).
  14. as...
  15. putToTuDB().
  16. asNode(nodeType, [resultsA.id, ...]).
  17. asRelation(relation, [resultsA.id, resultsB.id]).
  18. putToAnalysis().
  19. to...
  20. putToRelationEngine().
  21. to...

这样,未来的整个

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注