@yanglfyangl
2018-05-22T10:09:21.000000Z
字数 4973
阅读 485
void handle(){
ServiceA.Methond1(para1, para2, ...);
ServiceB.Methond1(para1, para2, ...);
String key = ServiceC.Methond(para1, para2, ...);
if(key){
ServiceA.Methond2(para1, para2, ...);
} else {
ServiceB.Methond2(para1, para2, ...);
}
int case = ServiceD.Methond(para1, para2, ...);
switch(case):{
ECase1:
ServiceE.Methond2(para1, para2, ...);
ECase1:
ServiceF.Methond2(para1, para2, ...);
ECase1:
ServiceG.Methond2(para1, para2, ...);
ECase1:
ServiceH.Methond2(para1, para2, ...);
default:
log("..");
}
}
- 所有接口最好是幂等的,虽然我们的逻辑中可以不需要,但重要功能函数幂等是推荐的。
- 不必要的接口不必支持事务,每个事务都会占用内存或缓存,所以比较简单的无状态的接口,不要加缓存。
- TCC(强一致性使用)
- 基于外部事件表的"可靠事件模式" (周期长,时间要求不高但一致性要求高时使用)
- 基于技术异常的“补偿模式”(最大重试+恢复重试+人工异常单处理)
对于TCC,我们可以使用开源的tcc-transaction或tx-LCN( 目前使用的公司还是有点少 )来实现,所以我们的系统的重点就是实现另外两种
- 任何地方都可能出错,比如放如队列,但队列没有及时返回给调用方这样的错误。
- 大多数情况下调用之间是有上下文关系的,后续的操作是需要等待上次操作的完成的。
- 往往发现事务问题时,前期的代码都已成型,改动的成本很高,但同时压力还很大。
期望的代码逻辑变化,与其它的框架不同的是,并不是走注解的方式来创建,而是代码显示的创建,每个需要支持的方法里,都需要显示的传入dtsSession,如果不传或传空的话,将不会支持DTS
class DTSSession:{
}
class DTSClient:{
}
void test_aSyncDTS(){
DTSSession dtsSession = new DTSSession();
DTSClient client.push(dtsSession);
boolean ret = client.run({
ServiceA.Methond1(dtsSession, para1, para2, ...);
ServiceB.Methond1(dtsSession, para1, para2, ...);
String key = ServiceC.Methond(dtsSession, para1, para2, ...);
if(key){
ServiceA.Methond2(dtsSession, para1, para2, ...);
} else {
ServiceB.Methond2(dtsSession, para1, para2, ...);
}
});
if(ret ) {
// 只要返回成功,则可以返回用户成功了,系统会在未来一段时间内尽量保证
// 这个操作被成功的执行完成。
}
}
调用的代码逻辑大概是这样的:
//////////////////////////////////
void method() {//调用事务的主方法
DTSClient client = XXX;
client.run(dtsSession, {
ServiceA.Methond1(dtsSession, para1, para2, ...);
{//调用Methond1的时候的逻辑是
1. 全局的filter会过滤所有dtsSession不为空的方法。
2. 根据dtsSession来从缓存中查看是否有结果
3. 如果有之前同样请求调用的结果,则从缓存中直接返回。
4. 否则会调用相关的方法。
5. 调用方法的返回值,存入dtsSession对应的缓存中
这时候,为了防止插入多次等,所以返回值即使只是简单的正常,也会放入缓存中
,只要是同一事务ID,则会返回相对的返回值。。
}
ServiceA.Methond1(dtsSession, para1, para2, ...);
if(key){
ServiceA.Methond2(dtsSession, para1, para2, ...);
} else {
ServiceB.Methond2(dtsSession, para1, para2, ...);
}
}, null);
//null 表示暂时不提供rollback方法。当不提供rollback方法的时候,会进入异常单处理流程。
return success; //针对可以保证最终一致性的方法。
}
//////////////////////////////////
class DTSClient{
List(dtsSession, function(), function()) callbackArray = List();
boolean run(DTSSession dtsSession, function execFunc, function rollbackFunc) {
//将function与DTS绑定后放入队列。
callbackArray.push(Object<dtsSession, execFunc, rollbackFunc>);
try{
execFunc();
} catch(Exception e){
//打日志等等。。。
}
只要放到了队列中,就返回true。
return true;
}
boolean handleDTSEvent(Event){
//接收从Rocket队列中传来的消息,
Function f = callbackArray.get(Event.SessionID());
if(f!=null){
f();
}
Ack(“消息处理完毕”);
}
}
//用于普通Java代码的过滤。
class DTSAspectJFilter:{
@Before("execution(* **(DTSSession, ..))")
public void onMethodCalledBefore(JoinPoint joinPoint) throws Throwable {
//1. 判断方法中的DTSSession是否为空,如果为空,则直接调用方法体。
//2. 从缓存里读取数据,如果读到,不调用方法,直接返回数据。
//3. 如果取不到,则调用方法。
if(DTSSession !=null ){
if(DTSSession.hasData()){
return DTSSession.Data()
} else {
调用对应的方法
}
}
}
@After("execution(*(DTSSession, ..))")
public void onMethodCalledAfter(JoinPoint joinPoint) throws Throwable {
//1. 方法调用完后,将数据写入本地缓存或Redis。(使用JetCache)
//2. 缓存的key为 DTS_sessionID_class_method_parastring
//3. 超时时间为65s,也就是说,事务的有效期为60秒,事务过期后,如果还没成功,
// 则进入异常单处理流程。
}
}
//用于Dubbo接口的过滤器,这样业务端的dubbo接口不需要做任何变化。
class DTSDubboFilter:{
// 与上面逻辑基本一致。
// 但所有Dubbo调用的缓存,必须要写入全局缓存。
}
/*
*/
class DTSSession:{
private long sessionID;
savaData(Object param, Object results){}
Boolean hasData(Object param){}
Object getData(Object param){}
public void init(){
//将SessionID写入Redis中的待处理事务区(1分钟过期)。
//DTSServer需要定期将这部分待处理事务进行处理
//这就需要用到Redis的“过期通知机制”
}
public long GetID(){};
....
}
class DTSServer:{
void handle(Event e){
if(e.Timeout){
// 当Redis中某个事件过期时调用。
// 当读到某个事件过期时,将内容读出来,放入业务异常日志(日志还会单独处理)。
// 因为内容所在区比信息所在区时间长,所以肯定可以取到。
}
if(e.Created) {
// save info to DB, 主要是为了防止一旦redis出问题,数据还在。
}
if(e.Changed){
if(e.Data().Status = false) {
//表示执行失败了,这时候的逻辑是:
1. 通过MQ发送消息.
2. 对应的DTSClient得到消息后,进行重试。
3. 重试的时间间隔等逻辑,由DTSClient控制
sendEventToClient();
}
}
}
//这个如果要开源出去或做成公共组件供各种组来使用,可以做成plugin模式。
void exceptionRepaire(){
//通过读取业务异常日志文件进行继续分析
switch(type) //对于不同的业务类型处理方式不同
{
case bz1:
比如:
1. 重试几次,如果全失败,写入异常单;
2. 调用撤回操作,异步通知用户。
3. 如果提供了异常回滚接口,则调用回滚接口。。。
4. 。。。
case bz2:
default:
...
}
}
}
//////////////////////////////////
如果需要强一致性:
@Translation(...);
boolean handle() {
//用数据库事务来保证回滚。
ServiceA.Saved(...);
if(!handle(...)){
rollback();
return false;
}
return true;
}
boolean handle(...){
DTSSession dtsSession = new DTSSession();
client.push(dtsSession);
boolean ret = client.run({
ServiceA.Methond1(dtsSession, para1, para2, ...);
ServiceB.Methond1(dtsSession, para1, para2, ...);
String key = ServiceC.Methond(dtsSession, para1, para2, ...);
if(key){
ServiceA.Methond2(dtsSession, para1, para2, ...);
} else {
ServiceB.Methond2(dtsSession, para1, para2, ...);
}
});
return true;
}
如果不太需要强一致性
boolean handle() {
//用数据库事务来保证回滚。
boolean ret = true;
try{
ServiceA.Saved(...);
} catch {
ret = false;
return ret;
}
ret = client.run({
ServiceA.Methond1(dtsSession, para1, para2, ...);
....
....
})
return ret;
}
- 各个接口不是必需要实现幂等性;当然,最好是实现;
- 如果有逻辑的话(if条件,switch,甚至多线程,actor模型。。。),当某个接口重试恢复后,还会继续原来的操作继续实现。代码逻辑不会因为逻辑过于复杂而改动非常大。
- 大量的使用JVM 内存和缓存,最好提前预估好容量。
- 对于rollback不太容易处理,如果需要rollback的,还是需要与TCC框架和本地事务配合,业务端需要控制好粒度。
- 一旦JVM在某个事务的执行中挂掉了,需要额外的业务补偿。
- 需要本地事务保证的地方,还是需要使用相关的本地事务框架。