Seata-Saga模式 原理

Seata-Saga模式 原理1Saga模式示例1.1Saga状态机工具状态机设计组件:seata-saga-statemachine-designer状态机在线画图工具:saga_designer1.2代码示例github上Seata-sample有完整的示例代码,SeataSaga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。1.2.1初始化dbmysql示例:CREATETABLEIFNOTEXISTS`seata_state_machine_def`(

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

1 Saga模式示例

1.1 Saga状态机工具

状态机设计组件:seata-saga-statemachine-designer
状态机在线画图工具:saga_designer

1.2 代码示例

github上Seata-sample有完整的示例代码,Seata Saga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。

1.2.1 初始化db

mysql示例:

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
    `id`               VARCHAR(32)  NOT NULL COMMENT 'id',
    `name`             VARCHAR(128) NOT NULL COMMENT 'name',
    `tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',
    `app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',
    `type`             VARCHAR(20)  COMMENT 'state language type',
    `comment_`         VARCHAR(255) COMMENT 'comment',
    `ver`              VARCHAR(16)  NOT NULL COMMENT 'version',
    `gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',
    `status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',
    `content`          TEXT COMMENT 'content',
    `recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
    `id`                  VARCHAR(128)            NOT NULL COMMENT 'id',
    `machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',
    `tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',
    `parent_id`           VARCHAR(128) COMMENT 'parent id',
    `gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',
    `business_key`        VARCHAR(48) COMMENT 'business key',
    `start_params`        TEXT COMMENT 'start parameters',
    `gmt_end`             DATETIME(3) COMMENT 'end time',
    `excep`               BLOB COMMENT 'exception',
    `end_params`          TEXT COMMENT 'end parameters',
    `status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
    `gmt_updated`         DATETIME(3) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
    `id`                       VARCHAR(48)  NOT NULL COMMENT 'id',
    `machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
    `name`                     VARCHAR(128) NOT NULL COMMENT 'state name',
    `type`                     VARCHAR(20)  COMMENT 'state type',
    `service_name`             VARCHAR(128) COMMENT 'service name',
    `service_method`           VARCHAR(128) COMMENT 'method name',
    `service_type`             VARCHAR(16) COMMENT 'service type',
    `business_key`             VARCHAR(48) COMMENT 'business key',
    `state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
    `state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',
    `gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',
    `is_for_update`            TINYINT(1) COMMENT 'is service for update',
    `input_params`             TEXT COMMENT 'input parameters',
    `output_params`            TEXT COMMENT 'output parameters',
    `status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `excep`                    BLOB COMMENT 'exception',
    `gmt_updated`              DATETIME(3) COMMENT 'update time',
    `gmt_end`                  DATETIME(3) COMMENT 'end time',
    PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

1.2.2 bean配置

<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
    <constructor-arg>
        <bean class="org.h2.jdbcx.JdbcDataSource">
            <property name="URL" value="jdbc:h2:mem:seata_saga" />
            <property name="user" value="sa" />
            <property name="password" value="sa" />
        </bean>
    </constructor-arg>
</bean>

<jdbc:initialize-database data-source="dataSource">
    <jdbc:script location="classpath:sql/h2_init.sql" />
</jdbc:initialize-database>

<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
    <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
    <property name="dataSource" ref="dataSource"></property>
    <property name="resources" value="statelang/*.json"></property>
    <property name="enableAsync" value="true"></property>
    <property name="threadPoolExecutor" ref="threadExecutor"></property>
    <property name="applicationId" value="saga_sample"></property>
    <property name="txServiceGroup" value="my_test_tx_group"></property>
</bean>
<bean id="threadExecutor"
      class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
    <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="20" />
</bean>
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
    <property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>

1.2.3 Saga状态机配置示例

{
    "Name": "reduceInventoryAndBalance",
    "Comment": "reduce inventory then reduce balance in a transaction",
    "StartState": "ReduceInventory",
    "Version": "0.0.1",
    "States": {
        "ReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceInventory",
            "Next": "ChoiceState",
            "Input": [
                "$.[businessKey]",
                "$.[count]"
            ],
            "Output": {
                "reduceInventoryResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            }
        },
        "ChoiceState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[reduceInventoryResult] == true",
                    "Next":"ReduceBalance"
                }
            ],
            "Default":"Fail"
        },
        "ReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceBalance",
            "Input": [
                "$.[businessKey]",
                "$.[amount]",
                {
                    "throwException" : "$.[mockReduceBalanceFail]"
                }
            ],
            "Output": {
                "compensateReduceBalanceResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ],
            "Next": "Succeed"
        },
        "CompensateReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensateReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"
        },
        "Succeed": {
            "Type":"Succeed"
        },
        "Fail": {
            "Type":"Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        }
    }
}

1.2.4 启动状态机

创建StateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");

执行-同步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);

执行-异步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);

2 状态机设置

2.1 状态机属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个”状态”
  • States: 状态列表,是一个map结构,key是”状态”的名称,在状态机内必须唯一

2.2 状态属性

  • Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  • ServiceName: 服务名称,通常是服务的beanId
  • ServiceMethod: 服务方法名称
  • CompensateState: 该”状态”的补偿”状态”
  • Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
  • Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  • Catch: 捕获到异常后的路由
  • Next: 服务执行完成后下一个执行的”状态”
  • Choices: Choice类型的”状态”里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个”状态”
  • ErrorCode: Fail类型”状态”的错误码
  • Message: Fail类型”状态”的错误信息

3 原理分析

Saga模式是一种长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者。

3.1 Saga状态机引擎架构

在这里插入图片描述
注:此图来自seata官网的博客。

状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:

  • Eventing层:
    实现事件驱动架构,可以压入事件并由消费端消费事件,本层不关心事件是什么消费端执行什么,由上层实现。

  • ProcessController层:
    由于上层的Eventing驱动一个“空”流程执行的执行,”state”的行为和路由都未实现,由上层实现;基于以上两层理论上可以自定义扩展任何”流程”引擎。

  • StateMachineEngine层:
    实现状态机引擎每种state的行为和路由逻辑;提供API、状态机语言仓库。

3.2 Saga状态机引擎

在这里插入图片描述
注:此图来自seata官网的博客。

在这里插入图片描述
注:此图来自seata官网的博客。
Saga模式下,事务会根据json配置的state来执行,如果前一个state的正向服务执行成功,那么就路由到下一个state并执行下一个state的正向服务,如果执行失败,那么基于CompensateState属性执行补偿服务。

从代码的角度来看,Saga执行过程如下:
在这里插入图片描述

3.3 分布式事务的时序图

在这里插入图片描述

从时序图上可以看到,Saga模式和AT、TCC模式有较大的差异:

  • Saga模式下TM、RM均由开启事务的微服务承担,AT、TCC模式的TM、RM一般是分开的。
  • Saga模式由一个状态机实现,其实现过程主要包括:路由到正确的state + 执行state。

4 源码分析

4.1 TM开启事务

4.1.1 通过StateMachineEngine开启事务

Saga的分布式事务由StateMachineEngine开启。目前支持同步、异步方式,示例代码如下:

StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) ;

StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;

StateMachineEngine开启事务,主要包括以下几件事情:

  • 创建执行上线文ProcessContext。
  • 向TC开启全局事务,并返回xid。
  • 将state持久化到本地(DB)。
  • 向EventBus中发布Event。
private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {

    if (async && !stateMachineConfig.isEnableAsync()) {
        throw new EngineExecutionException(
            "Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.",
            FrameworkErrorCode.AsynchronousStartDisabled);
    }

    if (StringUtils.isEmpty(tenantId)) {
        tenantId = stateMachineConfig.getDefaultTenantId();
    }

    StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);

    ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)
        .withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(
            new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)
        .withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);

    Map<String, Object> contextVariables;
    if (startParams != null) {
        contextVariables = new ConcurrentHashMap<>(startParams.size());
        nullSafeCopy(startParams, contextVariables);
    } else {
        contextVariables = new ConcurrentHashMap<>();
    }
    instance.setContext(contextVariables);

    contextBuilder.withStateMachineContextVariables(contextVariables);

    contextBuilder.withIsAsyncExecution(async);

    ProcessContext processContext = contextBuilder.build();

    if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
        stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
    }
    if (StringUtils.isEmpty(instance.getId())) {
        instance.setId(
            stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
    }

    if (async) {
        stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
    } else {
        stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
    }

    return instance;
}

4.1.2 状态机的流程处理

状态机的流程处理主要包括两个部分:执行state、路由到下一个state。在ProcessController中定义了状态机的执行流程:

  • 找到合适的Processor,然后执行state中指定的方法。
  • 路由到下一个State,并执行state
    ProcessControllerImpl代码如下所示:
public void process(ProcessContext context) throws FrameworkException {
    try {
        businessProcessor.process(context);

        businessProcessor.route(context);
    } catch (FrameworkException fex) {
        throw fex;
    } catch (Exception ex) {
        LOGGER.error("Unknown exception occurred, context = {}", context, ex);
        throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
    }
}

4.2 处理器

4.2.1 根据状态机json配置找到对应的Processor

StateMachineProcessHandler对状态机的每一种type都配置了handler,如下图所示:

public void initDefaultHandlers() {
    if (stateHandlers.isEmpty()) {
        stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, new ServiceTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SCRIPT_TASK, new ScriptTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, new ServiceTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, new SubStateMachineHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_CHOICE, new ChoiceStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_SUCCEED, new SucceedEndStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_FAIL, new FailEndStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_COMPENSATION_TRIGGER, new CompensationTriggerStateHandler());
    }
}

结合前面的示例数据,分析状态机是如何找到合适的State处理器的:

"States": {
    "ReduceInventory": {
        "Type": "ServiceTask",  -> 使用ServiceTaskStateHandler处理
        "ServiceName": "inventoryAction", -> 对应到bean的名字
        "ServiceMethod": "reduce",          ->对应到bean中的具体方法
        "CompensateState": "CompensateReduceInventory", -> 补偿的state
        "Next": "ChoiceState",      ->下一个state
        "Input": [
            "$.[businessKey]",
            "$.[count]"
        ],
        "Output": {
            "reduceInventoryResult": "$.#root"
        },
        "Status": {
            "#root == true": "SU",
            "#root == false": "FA",
            "$Exception{java.lang.Throwable}": "UN"
        }
    }
}

public void process(ProcessContext context) throws FrameworkException {
    StateInstruction instruction = context.getInstruction(StateInstruction.class);
    State state = instruction.getState(context);
    String stateType = state.getType();
    StateHandler stateHandler = stateHandlers.get(stateType);

    List<StateHandlerInterceptor> interceptors = null;
    if (stateHandler instanceof InterceptableStateHandler) {
        interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();
    }

    List<StateHandlerInterceptor> executedInterceptors = null;
    Exception exception = null;
    try {
        if (CollectionUtils.isNotEmpty(interceptors)) {
            executedInterceptors = new ArrayList<>(interceptors.size());
            for (StateHandlerInterceptor interceptor : interceptors) {
                executedInterceptors.add(interceptor);
                interceptor.preProcess(context);
            }
        }

        stateHandler.process(context);

    } catch (Exception e) {
        exception = e;
        throw e;
    } finally {
        if (CollectionUtils.isNotEmpty(executedInterceptors)) {
            for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
                StateHandlerInterceptor interceptor = executedInterceptors.get(i);
                interceptor.postProcess(context, exception);
            }
        }
    }
}

4.2.2 执行前置拦截器

主要包括以下内容:

  • 注册分支事务
  • 向本地数据库插入state(state_inst表)
    代码见DbAndReportTcStateLogStore#recordStateStarted:
public void recordStateStarted(StateInstance stateInstance, ProcessContext context) {
    if (stateInstance != null) {

        boolean isUpdateMode = isUpdateMode(stateInstance, context);

        // if this state is for retry, do not register branch
        if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {
            if (isUpdateMode) {
                stateInstance.setId(stateInstance.getStateIdRetriedFor());
            } else {
                // generate id by default
                stateInstance.setId(generateRetryStateInstanceId(stateInstance));
            }
        }
        // if this state is for compensation, do not register branch
        else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
            stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode));
        } else {
            branchRegister(stateInstance, context);
        }

        if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
            stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
        }

        stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams()));
        if (!isUpdateMode) {
            executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType),
                STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance);
        } else {
            // if this retry/compensate state do not need persist, just update last inst
            executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType),
                stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()),
                stateInstance.getMachineInstanceId(), stateInstance.getId());
        }
    }
}

4.2.3 Processor执行State

当type=ServiceTask时,将会由ServiceTaskStateHandler处理,具体逻辑如下:

  • 从state中获取beanname以及methodName。
  • 创建serviceInvoker对象。
  • serviceInvoker首先从applicationContext获取bean,并通过反射找到method。
  • 调用method方法。
  • 返回结果。
    代码如下:
public void process(ProcessContext context) throws EngineExecutionException {
    StateInstruction instruction = context.getInstruction(StateInstruction.class);
    ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);

    String serviceName = state.getServiceName();
    String methodName = state.getServiceMethod();
    StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);

    Object result;
    try {

        List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);

        //Set the current task execution status to RU (Running)
        stateInstance.setStatus(ExecutionStatus.RU);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[{}], ServiceName[{}], Method[{}], Input:{}",
                    state.getName(), serviceName, methodName, input);
        }

        if (state instanceof CompensateSubStateMachineState) {
            //If it is the compensation of the substate machine,
            // directly call the state machine's compensate method
            result = compensateSubStateMachine(context, state, input, stateInstance,
                    (StateMachineEngine) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_ENGINE));
        } else {
            StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
                    DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

            ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(
                    state.getServiceType());
            if (serviceInvoker == null) {
                throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",
                        FrameworkErrorCode.ObjectNotExists);
            }
            if (serviceInvoker instanceof ApplicationContextAware) {
                ((ApplicationContextAware) serviceInvoker).setApplicationContext(
                        stateMachineConfig.getApplicationContext());
            }

            result = serviceInvoker.invoke(state, input.toArray());
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",
                    state.getName(), serviceName, methodName, result);
        }

        if (result != null) {
            stateInstance.setOutputParams(result);
            ((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_OUTPUT_PARAMS,
                    result);
        }
    } catch (Throwable e) {
        LOGGER.error("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute failed.",
                state.getName(), serviceName, methodName, e);

        ((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_CURRENT_EXCEPTION, e);

        EngineUtils.handleException(context, state, e);
    }
}

4.2.4 执行后置拦截器

执行完业务代码以后,会进入拦截器后置处理流程,主要包括以下内容:

  • 更新数据库中state状态(state_inst表)
  • 向TC报告Branch事务执行结果

4.3 路由

4.3.1 找到下一个state

路由的具体过程如下:

  • 在state中找到Next节点
  • 然后执行Expression表达式找到下一个state。
StateMachineProcessRouter代码:
public Instruction route(ProcessContext context) throws FrameworkException {

    StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);

    State state;
    if (stateInstruction.getTemporaryState() != null) {
        state = stateInstruction.getTemporaryState();
        stateInstruction.setTemporaryState(null);
    } else {
        StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(
            DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
        StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(
            stateInstruction.getStateMachineName(), stateInstruction.getTenantId());
        state = stateMachine.getStates().get(stateInstruction.getStateName());
    }

    String stateType = state.getType();

    StateRouter router = stateRouters.get(stateType);

    Instruction instruction = null;

    List<StateRouterInterceptor> interceptors = null;
    if (router instanceof InterceptableStateRouter) {
        interceptors = ((InterceptableStateRouter)router).getInterceptors();
    }

    List<StateRouterInterceptor> executedInterceptors = null;
    Exception exception = null;
    try {
        if (CollectionUtils.isNotEmpty(interceptors)) {
            executedInterceptors = new ArrayList<>(interceptors.size());
            for (StateRouterInterceptor interceptor : interceptors) {
                executedInterceptors.add(interceptor);
                interceptor.preRoute(context, state);
            }
        }

        instruction = router.route(context, state);

    } catch (Exception e) {
        exception = e;
        throw e;
    } finally {
        if (CollectionUtils.isNotEmpty(executedInterceptors)) {
            for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
                StateRouterInterceptor interceptor = executedInterceptors.get(i);
                interceptor.postRoute(context, state, instruction, exception);
            }
        }

        //if 'Succeed' or 'Fail' State did not configured, we must end the state machine
        if (instruction == null && !stateInstruction.isEnd()) {
            EngineUtils.endStateMachine(context);
        }
    }

    return instruction;
}


TaskStateRouter代码:
public Instruction route(ProcessContext context, State state) throws EngineExecutionException {

    StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
    if (stateInstruction.isEnd()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(
                "StateInstruction is ended, Stop the StateMachine executing. StateMachine[{}] Current State[{}]",
                stateInstruction.getStateMachineName(), state.getName());
        }
        return null;
    }

    //The current CompensationTriggerState can mark the compensation process is started and perform compensation
    // route processing.
    State compensationTriggerState = (State)context.getVariable(
        DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);
    if (compensationTriggerState != null) {
        return compensateRoute(context, compensationTriggerState);
    }

    //There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.
    String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);

    if (StringUtils.hasLength(next)) {
        context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
    } else {
        next = state.getNext();
    }

    //If next is empty, the state selected by the Choice state was taken.
    if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {
        next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
        context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
    }

    if (!StringUtils.hasLength(next)) {
        return null;
    }

    StateMachine stateMachine = state.getStateMachine();

    State nextState = stateMachine.getState(next);
    if (nextState == null) {
        throw new EngineExecutionException("Next state[" + next + "] is not exits",
            FrameworkErrorCode.ObjectNotExists);
    }

    stateInstruction.setStateName(next);

    return stateInstruction;
}

4.3.2 结束事务

在StateMachineProcessRouter#route方法中,我们可以看到,当没有下一个state时,将会通过以下代码结束事务,

if (CollectionUtils.isNotEmpty(executedInterceptors)) {
    for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
        StateRouterInterceptor interceptor = executedInterceptors.get(i);
        interceptor.postRoute(context, state, instruction, exception);
    }
}

//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
    EngineUtils.endStateMachine(context);
}

结束事务的具体过程:

  • 更新状态机(state_machine_inst)状态为成功或失败。
  • 向TC汇报分布式事务的状态,即:通知TC进行global commit/rollback。

4.4 TC接收到通知全局事务Global Commit/Rollback

Saga模式下TC中执行的内容和AT模式非常相似,不过在TC收到Global Commit/Rollback时,TC仅修改全局事务状态,而不会立即进行回滚操作。具体是通过DefaultCoordinator中retryRollbacking、retryCommitting定时任务完成。

4.5 RM处理Global Commit/Rollback

Saga中分支事务参与这不管理分支事务的状态,所有均在TM中基于state进行管理,所以TC通知Global Commit/Rollback时,TM会作为RM来完成state的状态管理(Commit/Rollback)。
Commit流程:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机继续执行

Rollback:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机执行补偿流程。

在SagaResourceManager中,有branchCommit、branchRollback的处理逻辑,如下:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    try {
        StateMachineInstance machineInstance = StateMachineEngineHolder.getStateMachineEngine().forward(xid, null);

        if (ExecutionStatus.SU.equals(machineInstance.getStatus())
            && machineInstance.getCompensationStatus() == null) {
            return BranchStatus.PhaseTwo_Committed;
        } else if (ExecutionStatus.SU.equals(machineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        } else if (ExecutionStatus.FA.equals(machineInstance.getCompensationStatus()) || ExecutionStatus.UN.equals(
            machineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } else if (ExecutionStatus.FA.equals(machineInstance.getStatus())
            && machineInstance.getCompensationStatus() == null) {
            return BranchStatus.PhaseOne_Failed;
        }

    } catch (ForwardInvalidException e) {
        LOGGER.error("StateMachine forward failed, xid: " + xid, e);

        //if StateMachineInstanceNotExists stop retry
        if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
            return BranchStatus.PhaseTwo_Committed;
        }
    } catch (Exception e) {
        LOGGER.error("StateMachine forward failed, xid: " + xid, e);
    }
    return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    try {
        StateMachineInstance stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().reloadStateMachineInstance(xid);
        if (stateMachineInstance == null) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
        if (RecoverStrategy.Forward.equals(stateMachineInstance.getStateMachine().getRecoverStrategy())
            && (GlobalStatus.TimeoutRollbacking.name().equals(applicationData)
                    || GlobalStatus.TimeoutRollbackRetrying.name().equals(applicationData))) {
            LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", xid);
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }

        stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().compensate(xid,
            null);
        if (ExecutionStatus.SU.equals(stateMachineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    } catch (EngineExecutionException e) {
        LOGGER.error("StateMachine compensate failed, xid: " + xid, e);

        //if StateMachineInstanceNotExists stop retry
        if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    } catch (Exception e) {
        LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
    }
    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}

5 最佳实践

  • 适用场景
    业务流程长、业务流程多

  • 优势
    一阶段提交本地事务,无锁,高性能。
    事件驱动架构,参与者可异步执行,高吞吐。
    补偿服务易于实现。

  • 缺点
    不保证隔离性

  • 注意
    因为需要自己实现正向服务和逆向补偿服务,所以TCC模式遇到的问题,此模式一样存在,即:

  • 允许空补偿

  • 防悬挂控制

  • 幂等控制

6 参考文档

Seata Saga模式

分布式事务 Seata 及其三种模式详解 | Meetup#3 回顾

Seata 原理

Seata-AT模式 原理

Seata-TCC模式 原理

Seata-Saga模式 原理

Seata-XA模式 原理

TCC-Transaction原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/191742.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 为什么下面老是流水出来是什么原因_integer.parseint和valueof

    为什么下面老是流水出来是什么原因_integer.parseint和valueofInteger.MAX_VALUE,十六进制位为0x7fffffff,二进制位:01111111111111111111111111111111;Integer.MIN_VALUE,即0x80000000,二进制位: 10000000000000000000000000000000;01111111111111111111111111111111+…

  • 中科院计算机生物学,中科院计算生物学重点实验室揭牌[通俗易懂]

    中科院计算机生物学,中科院计算生物学重点实验室揭牌[通俗易懂]德国马普学会副主席HerbertJaeckle和中科院副院长李家洋共同为重点实验室揭牌3月29日下午,中科院计算生物学重点实验室在上海生科院计算生物学所正式揭牌。德国马普学会副主席HerbertJaeckle教授、德国马普学会分子植物生理所所长LotharWillmitzer教授、中科院副院长李家洋院士、中科院生命科学与生物技术局局长张知彬研究员、国际合作局局长吕永龙研究员,上海生科院副院长…

  • java copyproperties_java中 BeanUtils.copyProperties的用法

    java copyproperties_java中 BeanUtils.copyProperties的用法BeanUtils提供了对java发射和自省API的包装,这里对BeanUtils.copyProperties的用法做一个小小的介绍。通过函数名可以知道,copyProperties函数是对属性进行拷贝,该函数有两个参数,一个是原始的数据,另一个是接收这些属性的数据。这里给大家介绍一个我遇到的一个坑:在不同的jar包中,该函数的两个参数的位置不一样,有一种是copyProperties(java…

  • mac。 idea 激活码2022【中文破解版】「建议收藏」

    (mac。 idea 激活码2022)JetBrains旗下有多款编译器工具(如:IntelliJ、WebStorm、PyCharm等)在各编程领域几乎都占据了垄断地位。建立在开源IntelliJ平台之上,过去15年以来,JetBrains一直在不断发展和完善这个平台。这个平台可以针对您的开发工作流进行微调并且能够提供…

  • 我的世界区块显示_我的世界怎么显示区块线

    我的世界区块显示_我的世界怎么显示区块线我的世界手游区块是一个独特的机制,很多玩家对于区块是什么不太了解,区块显示指令以及区块的产生不是很熟悉,为了帮助到大家,今天小编就为大家带来我的世界手游区块显示指令分享:区块玩法操作详解的内容,希望大家能够喜欢,下面就让我们一起来看看吧!区块相关1.出生点区块在出生点附近的区块是一块围绕世界出生点的区域中的一个区块,只要有玩家在主世界,它就不会被从内存中卸载。这意味着像红石元件和刷怪会继续,甚至所…

  • 约束条件(constraint)「建议收藏」

    约束条件(constraint)「建议收藏」1.为啥使用约束条件:约束条件也叫完整性约束条件,当对表中的数据做DML操作时会验证数据是否违反约束条件.如果违反了DML操作会失败.约束条件可以应用于表中的一列或几列,应用于整个表或几个表之间.约束条件分类:非空(NOTNULL),唯一(UNIQUE),主键(PRIMARYKEY),外键(FOREIGNKEY),检查(CHECK).其中NOTNULL只能应用于列.

    2022年10月13日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号