Seata-Saga模式 原理

Seata-Saga模式 原理1Saga模式示例1.1Saga状态机工具状态机设计组件:seata-saga-statemachine-designer状态机在线画图工具:saga_designer1.2代码示例github上Seata-sample有完整的示例代码,SeataSaga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。1.2.1初始化dbmysql示例:CREATETABLEIFNOTEXISTS`seata_state_machine_def`(

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

1 Saga模式示例

1.1 Saga状态机工具

状态机设计组件:seata-saga-statemachine-designer
状态机在线画图工具:saga_designer

1.2 代码示例

github上Seata-sample有完整的示例代码,Seata Saga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。

1.2.1 初始化db

mysql示例:

CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
    `id`               VARCHAR(32)  NOT NULL COMMENT 'id',
    `name`             VARCHAR(128) NOT NULL COMMENT 'name',
    `tenant_id`        VARCHAR(32)  NOT NULL COMMENT 'tenant id',
    `app_name`         VARCHAR(32)  NOT NULL COMMENT 'application name',
    `type`             VARCHAR(20)  COMMENT 'state language type',
    `comment_`         VARCHAR(255) COMMENT 'comment',
    `ver`              VARCHAR(16)  NOT NULL COMMENT 'version',
    `gmt_create`       DATETIME(3)  NOT NULL COMMENT 'create time',
    `status`           VARCHAR(2)   NOT NULL COMMENT 'status(AC:active|IN:inactive)',
    `content`          TEXT COMMENT 'content',
    `recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
    `id`                  VARCHAR(128)            NOT NULL COMMENT 'id',
    `machine_id`          VARCHAR(32)             NOT NULL COMMENT 'state machine definition id',
    `tenant_id`           VARCHAR(32)             NOT NULL COMMENT 'tenant id',
    `parent_id`           VARCHAR(128) COMMENT 'parent id',
    `gmt_started`         DATETIME(3)             NOT NULL COMMENT 'start time',
    `business_key`        VARCHAR(48) COMMENT 'business key',
    `start_params`        TEXT COMMENT 'start parameters',
    `gmt_end`             DATETIME(3) COMMENT 'end time',
    `excep`               BLOB COMMENT 'exception',
    `end_params`          TEXT COMMENT 'end parameters',
    `status`              VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `is_running`          TINYINT(1) COMMENT 'is running(0 no|1 yes)',
    `gmt_updated`         DATETIME(3) NOT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
    `id`                       VARCHAR(48)  NOT NULL COMMENT 'id',
    `machine_inst_id`          VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
    `name`                     VARCHAR(128) NOT NULL COMMENT 'state name',
    `type`                     VARCHAR(20)  COMMENT 'state type',
    `service_name`             VARCHAR(128) COMMENT 'service name',
    `service_method`           VARCHAR(128) COMMENT 'method name',
    `service_type`             VARCHAR(16) COMMENT 'service type',
    `business_key`             VARCHAR(48) COMMENT 'business key',
    `state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
    `state_id_retried_for`     VARCHAR(50) COMMENT 'state retried for',
    `gmt_started`              DATETIME(3)  NOT NULL COMMENT 'start time',
    `is_for_update`            TINYINT(1) COMMENT 'is service for update',
    `input_params`             TEXT COMMENT 'input parameters',
    `output_params`            TEXT COMMENT 'output parameters',
    `status`                   VARCHAR(2)   NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
    `excep`                    BLOB COMMENT 'exception',
    `gmt_updated`              DATETIME(3) COMMENT 'update time',
    `gmt_end`                  DATETIME(3) COMMENT 'end time',
    PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

1.2.2 bean配置

<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
    <constructor-arg>
        <bean class="org.h2.jdbcx.JdbcDataSource">
            <property name="URL" value="jdbc:h2:mem:seata_saga" />
            <property name="user" value="sa" />
            <property name="password" value="sa" />
        </bean>
    </constructor-arg>
</bean>

<jdbc:initialize-database data-source="dataSource">
    <jdbc:script location="classpath:sql/h2_init.sql" />
</jdbc:initialize-database>

<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
    <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
    <property name="dataSource" ref="dataSource"></property>
    <property name="resources" value="statelang/*.json"></property>
    <property name="enableAsync" value="true"></property>
    <property name="threadPoolExecutor" ref="threadExecutor"></property>
    <property name="applicationId" value="saga_sample"></property>
    <property name="txServiceGroup" value="my_test_tx_group"></property>
</bean>
<bean id="threadExecutor"
      class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
    <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="20" />
</bean>
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
    <property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>

1.2.3 Saga状态机配置示例

{
    "Name": "reduceInventoryAndBalance",
    "Comment": "reduce inventory then reduce balance in a transaction",
    "StartState": "ReduceInventory",
    "Version": "0.0.1",
    "States": {
        "ReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceInventory",
            "Next": "ChoiceState",
            "Input": [
                "$.[businessKey]",
                "$.[count]"
            ],
            "Output": {
                "reduceInventoryResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            }
        },
        "ChoiceState":{
            "Type": "Choice",
            "Choices":[
                {
                    "Expression":"[reduceInventoryResult] == true",
                    "Next":"ReduceBalance"
                }
            ],
            "Default":"Fail"
        },
        "ReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceBalance",
            "Input": [
                "$.[businessKey]",
                "$.[amount]",
                {
                    "throwException" : "$.[mockReduceBalanceFail]"
                }
            ],
            "Output": {
                "compensateReduceBalanceResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ],
            "Next": "Succeed"
        },
        "CompensateReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensateReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"
        },
        "Succeed": {
            "Type":"Succeed"
        },
        "Fail": {
            "Type":"Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        }
    }
}

1.2.4 启动状态机

创建StateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");

执行-同步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);

执行-异步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);

2 状态机设置

2.1 状态机属性

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个”状态”
  • States: 状态列表,是一个map结构,key是”状态”的名称,在状态机内必须唯一

2.2 状态属性

  • Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  • ServiceName: 服务名称,通常是服务的beanId
  • ServiceMethod: 服务方法名称
  • CompensateState: 该”状态”的补偿”状态”
  • Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
  • Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  • Catch: 捕获到异常后的路由
  • Next: 服务执行完成后下一个执行的”状态”
  • Choices: Choice类型的”状态”里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个”状态”
  • ErrorCode: Fail类型”状态”的错误码
  • Message: Fail类型”状态”的错误信息

3 原理分析

Saga模式是一种长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者。

3.1 Saga状态机引擎架构

在这里插入图片描述
注:此图来自seata官网的博客。

状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:

  • Eventing层:
    实现事件驱动架构,可以压入事件并由消费端消费事件,本层不关心事件是什么消费端执行什么,由上层实现。

  • ProcessController层:
    由于上层的Eventing驱动一个“空”流程执行的执行,”state”的行为和路由都未实现,由上层实现;基于以上两层理论上可以自定义扩展任何”流程”引擎。

  • StateMachineEngine层:
    实现状态机引擎每种state的行为和路由逻辑;提供API、状态机语言仓库。

3.2 Saga状态机引擎

在这里插入图片描述
注:此图来自seata官网的博客。

在这里插入图片描述
注:此图来自seata官网的博客。
Saga模式下,事务会根据json配置的state来执行,如果前一个state的正向服务执行成功,那么就路由到下一个state并执行下一个state的正向服务,如果执行失败,那么基于CompensateState属性执行补偿服务。

从代码的角度来看,Saga执行过程如下:
在这里插入图片描述

3.3 分布式事务的时序图

在这里插入图片描述

从时序图上可以看到,Saga模式和AT、TCC模式有较大的差异:

  • Saga模式下TM、RM均由开启事务的微服务承担,AT、TCC模式的TM、RM一般是分开的。
  • Saga模式由一个状态机实现,其实现过程主要包括:路由到正确的state + 执行state。

4 源码分析

4.1 TM开启事务

4.1.1 通过StateMachineEngine开启事务

Saga的分布式事务由StateMachineEngine开启。目前支持同步、异步方式,示例代码如下:

StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) ;

StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;

StateMachineEngine开启事务,主要包括以下几件事情:

  • 创建执行上线文ProcessContext。
  • 向TC开启全局事务,并返回xid。
  • 将state持久化到本地(DB)。
  • 向EventBus中发布Event。
private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {

    if (async && !stateMachineConfig.isEnableAsync()) {
        throw new EngineExecutionException(
            "Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.",
            FrameworkErrorCode.AsynchronousStartDisabled);
    }

    if (StringUtils.isEmpty(tenantId)) {
        tenantId = stateMachineConfig.getDefaultTenantId();
    }

    StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);

    ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)
        .withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(
            new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)
        .withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);

    Map<String, Object> contextVariables;
    if (startParams != null) {
        contextVariables = new ConcurrentHashMap<>(startParams.size());
        nullSafeCopy(startParams, contextVariables);
    } else {
        contextVariables = new ConcurrentHashMap<>();
    }
    instance.setContext(contextVariables);

    contextBuilder.withStateMachineContextVariables(contextVariables);

    contextBuilder.withIsAsyncExecution(async);

    ProcessContext processContext = contextBuilder.build();

    if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
        stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
    }
    if (StringUtils.isEmpty(instance.getId())) {
        instance.setId(
            stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
    }

    if (async) {
        stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
    } else {
        stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
    }

    return instance;
}

4.1.2 状态机的流程处理

状态机的流程处理主要包括两个部分:执行state、路由到下一个state。在ProcessController中定义了状态机的执行流程:

  • 找到合适的Processor,然后执行state中指定的方法。
  • 路由到下一个State,并执行state
    ProcessControllerImpl代码如下所示:
public void process(ProcessContext context) throws FrameworkException {
    try {
        businessProcessor.process(context);

        businessProcessor.route(context);
    } catch (FrameworkException fex) {
        throw fex;
    } catch (Exception ex) {
        LOGGER.error("Unknown exception occurred, context = {}", context, ex);
        throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
    }
}

4.2 处理器

4.2.1 根据状态机json配置找到对应的Processor

StateMachineProcessHandler对状态机的每一种type都配置了handler,如下图所示:

public void initDefaultHandlers() {
    if (stateHandlers.isEmpty()) {
        stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, new ServiceTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SCRIPT_TASK, new ScriptTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, new ServiceTaskStateHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, new SubStateMachineHandler());

        stateHandlers.put(DomainConstants.STATE_TYPE_CHOICE, new ChoiceStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_SUCCEED, new SucceedEndStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_FAIL, new FailEndStateHandler());
        stateHandlers.put(DomainConstants.STATE_TYPE_COMPENSATION_TRIGGER, new CompensationTriggerStateHandler());
    }
}

结合前面的示例数据,分析状态机是如何找到合适的State处理器的:

"States": {
    "ReduceInventory": {
        "Type": "ServiceTask",  -> 使用ServiceTaskStateHandler处理
        "ServiceName": "inventoryAction", -> 对应到bean的名字
        "ServiceMethod": "reduce",          ->对应到bean中的具体方法
        "CompensateState": "CompensateReduceInventory", -> 补偿的state
        "Next": "ChoiceState",      ->下一个state
        "Input": [
            "$.[businessKey]",
            "$.[count]"
        ],
        "Output": {
            "reduceInventoryResult": "$.#root"
        },
        "Status": {
            "#root == true": "SU",
            "#root == false": "FA",
            "$Exception{java.lang.Throwable}": "UN"
        }
    }
}

public void process(ProcessContext context) throws FrameworkException {
    StateInstruction instruction = context.getInstruction(StateInstruction.class);
    State state = instruction.getState(context);
    String stateType = state.getType();
    StateHandler stateHandler = stateHandlers.get(stateType);

    List<StateHandlerInterceptor> interceptors = null;
    if (stateHandler instanceof InterceptableStateHandler) {
        interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();
    }

    List<StateHandlerInterceptor> executedInterceptors = null;
    Exception exception = null;
    try {
        if (CollectionUtils.isNotEmpty(interceptors)) {
            executedInterceptors = new ArrayList<>(interceptors.size());
            for (StateHandlerInterceptor interceptor : interceptors) {
                executedInterceptors.add(interceptor);
                interceptor.preProcess(context);
            }
        }

        stateHandler.process(context);

    } catch (Exception e) {
        exception = e;
        throw e;
    } finally {
        if (CollectionUtils.isNotEmpty(executedInterceptors)) {
            for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
                StateHandlerInterceptor interceptor = executedInterceptors.get(i);
                interceptor.postProcess(context, exception);
            }
        }
    }
}

4.2.2 执行前置拦截器

主要包括以下内容:

  • 注册分支事务
  • 向本地数据库插入state(state_inst表)
    代码见DbAndReportTcStateLogStore#recordStateStarted:
public void recordStateStarted(StateInstance stateInstance, ProcessContext context) {
    if (stateInstance != null) {

        boolean isUpdateMode = isUpdateMode(stateInstance, context);

        // if this state is for retry, do not register branch
        if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {
            if (isUpdateMode) {
                stateInstance.setId(stateInstance.getStateIdRetriedFor());
            } else {
                // generate id by default
                stateInstance.setId(generateRetryStateInstanceId(stateInstance));
            }
        }
        // if this state is for compensation, do not register branch
        else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
            stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode));
        } else {
            branchRegister(stateInstance, context);
        }

        if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
            stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
        }

        stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams()));
        if (!isUpdateMode) {
            executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType),
                STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance);
        } else {
            // if this retry/compensate state do not need persist, just update last inst
            executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType),
                stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()),
                stateInstance.getMachineInstanceId(), stateInstance.getId());
        }
    }
}

4.2.3 Processor执行State

当type=ServiceTask时,将会由ServiceTaskStateHandler处理,具体逻辑如下:

  • 从state中获取beanname以及methodName。
  • 创建serviceInvoker对象。
  • serviceInvoker首先从applicationContext获取bean,并通过反射找到method。
  • 调用method方法。
  • 返回结果。
    代码如下:
public void process(ProcessContext context) throws EngineExecutionException {
    StateInstruction instruction = context.getInstruction(StateInstruction.class);
    ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);

    String serviceName = state.getServiceName();
    String methodName = state.getServiceMethod();
    StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);

    Object result;
    try {

        List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);

        //Set the current task execution status to RU (Running)
        stateInstance.setStatus(ExecutionStatus.RU);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[{}], ServiceName[{}], Method[{}], Input:{}",
                    state.getName(), serviceName, methodName, input);
        }

        if (state instanceof CompensateSubStateMachineState) {
            //If it is the compensation of the substate machine,
            // directly call the state machine's compensate method
            result = compensateSubStateMachine(context, state, input, stateInstance,
                    (StateMachineEngine) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_ENGINE));
        } else {
            StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
                    DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

            ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(
                    state.getServiceType());
            if (serviceInvoker == null) {
                throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",
                        FrameworkErrorCode.ObjectNotExists);
            }
            if (serviceInvoker instanceof ApplicationContextAware) {
                ((ApplicationContextAware) serviceInvoker).setApplicationContext(
                        stateMachineConfig.getApplicationContext());
            }

            result = serviceInvoker.invoke(state, input.toArray());
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",
                    state.getName(), serviceName, methodName, result);
        }

        if (result != null) {
            stateInstance.setOutputParams(result);
            ((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_OUTPUT_PARAMS,
                    result);
        }
    } catch (Throwable e) {
        LOGGER.error("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute failed.",
                state.getName(), serviceName, methodName, e);

        ((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_CURRENT_EXCEPTION, e);

        EngineUtils.handleException(context, state, e);
    }
}

4.2.4 执行后置拦截器

执行完业务代码以后,会进入拦截器后置处理流程,主要包括以下内容:

  • 更新数据库中state状态(state_inst表)
  • 向TC报告Branch事务执行结果

4.3 路由

4.3.1 找到下一个state

路由的具体过程如下:

  • 在state中找到Next节点
  • 然后执行Expression表达式找到下一个state。
StateMachineProcessRouter代码:
public Instruction route(ProcessContext context) throws FrameworkException {

    StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);

    State state;
    if (stateInstruction.getTemporaryState() != null) {
        state = stateInstruction.getTemporaryState();
        stateInstruction.setTemporaryState(null);
    } else {
        StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(
            DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
        StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(
            stateInstruction.getStateMachineName(), stateInstruction.getTenantId());
        state = stateMachine.getStates().get(stateInstruction.getStateName());
    }

    String stateType = state.getType();

    StateRouter router = stateRouters.get(stateType);

    Instruction instruction = null;

    List<StateRouterInterceptor> interceptors = null;
    if (router instanceof InterceptableStateRouter) {
        interceptors = ((InterceptableStateRouter)router).getInterceptors();
    }

    List<StateRouterInterceptor> executedInterceptors = null;
    Exception exception = null;
    try {
        if (CollectionUtils.isNotEmpty(interceptors)) {
            executedInterceptors = new ArrayList<>(interceptors.size());
            for (StateRouterInterceptor interceptor : interceptors) {
                executedInterceptors.add(interceptor);
                interceptor.preRoute(context, state);
            }
        }

        instruction = router.route(context, state);

    } catch (Exception e) {
        exception = e;
        throw e;
    } finally {
        if (CollectionUtils.isNotEmpty(executedInterceptors)) {
            for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
                StateRouterInterceptor interceptor = executedInterceptors.get(i);
                interceptor.postRoute(context, state, instruction, exception);
            }
        }

        //if 'Succeed' or 'Fail' State did not configured, we must end the state machine
        if (instruction == null && !stateInstruction.isEnd()) {
            EngineUtils.endStateMachine(context);
        }
    }

    return instruction;
}


TaskStateRouter代码:
public Instruction route(ProcessContext context, State state) throws EngineExecutionException {

    StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
    if (stateInstruction.isEnd()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(
                "StateInstruction is ended, Stop the StateMachine executing. StateMachine[{}] Current State[{}]",
                stateInstruction.getStateMachineName(), state.getName());
        }
        return null;
    }

    //The current CompensationTriggerState can mark the compensation process is started and perform compensation
    // route processing.
    State compensationTriggerState = (State)context.getVariable(
        DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);
    if (compensationTriggerState != null) {
        return compensateRoute(context, compensationTriggerState);
    }

    //There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.
    String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);

    if (StringUtils.hasLength(next)) {
        context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
    } else {
        next = state.getNext();
    }

    //If next is empty, the state selected by the Choice state was taken.
    if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {
        next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
        context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
    }

    if (!StringUtils.hasLength(next)) {
        return null;
    }

    StateMachine stateMachine = state.getStateMachine();

    State nextState = stateMachine.getState(next);
    if (nextState == null) {
        throw new EngineExecutionException("Next state[" + next + "] is not exits",
            FrameworkErrorCode.ObjectNotExists);
    }

    stateInstruction.setStateName(next);

    return stateInstruction;
}

4.3.2 结束事务

在StateMachineProcessRouter#route方法中,我们可以看到,当没有下一个state时,将会通过以下代码结束事务,

if (CollectionUtils.isNotEmpty(executedInterceptors)) {
    for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
        StateRouterInterceptor interceptor = executedInterceptors.get(i);
        interceptor.postRoute(context, state, instruction, exception);
    }
}

//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
    EngineUtils.endStateMachine(context);
}

结束事务的具体过程:

  • 更新状态机(state_machine_inst)状态为成功或失败。
  • 向TC汇报分布式事务的状态,即:通知TC进行global commit/rollback。

4.4 TC接收到通知全局事务Global Commit/Rollback

Saga模式下TC中执行的内容和AT模式非常相似,不过在TC收到Global Commit/Rollback时,TC仅修改全局事务状态,而不会立即进行回滚操作。具体是通过DefaultCoordinator中retryRollbacking、retryCommitting定时任务完成。

4.5 RM处理Global Commit/Rollback

Saga中分支事务参与这不管理分支事务的状态,所有均在TM中基于state进行管理,所以TC通知Global Commit/Rollback时,TM会作为RM来完成state的状态管理(Commit/Rollback)。
Commit流程:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机继续执行

Rollback:

  • 通过xid找到全局事务(状态机实例记录,state_machine_inst表)
  • 还原现场,让状态机执行补偿流程。

在SagaResourceManager中,有branchCommit、branchRollback的处理逻辑,如下:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    try {
        StateMachineInstance machineInstance = StateMachineEngineHolder.getStateMachineEngine().forward(xid, null);

        if (ExecutionStatus.SU.equals(machineInstance.getStatus())
            && machineInstance.getCompensationStatus() == null) {
            return BranchStatus.PhaseTwo_Committed;
        } else if (ExecutionStatus.SU.equals(machineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        } else if (ExecutionStatus.FA.equals(machineInstance.getCompensationStatus()) || ExecutionStatus.UN.equals(
            machineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } else if (ExecutionStatus.FA.equals(machineInstance.getStatus())
            && machineInstance.getCompensationStatus() == null) {
            return BranchStatus.PhaseOne_Failed;
        }

    } catch (ForwardInvalidException e) {
        LOGGER.error("StateMachine forward failed, xid: " + xid, e);

        //if StateMachineInstanceNotExists stop retry
        if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
            return BranchStatus.PhaseTwo_Committed;
        }
    } catch (Exception e) {
        LOGGER.error("StateMachine forward failed, xid: " + xid, e);
    }
    return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    try {
        StateMachineInstance stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().reloadStateMachineInstance(xid);
        if (stateMachineInstance == null) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
        if (RecoverStrategy.Forward.equals(stateMachineInstance.getStateMachine().getRecoverStrategy())
            && (GlobalStatus.TimeoutRollbacking.name().equals(applicationData)
                    || GlobalStatus.TimeoutRollbackRetrying.name().equals(applicationData))) {
            LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", xid);
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }

        stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().compensate(xid,
            null);
        if (ExecutionStatus.SU.equals(stateMachineInstance.getCompensationStatus())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    } catch (EngineExecutionException e) {
        LOGGER.error("StateMachine compensate failed, xid: " + xid, e);

        //if StateMachineInstanceNotExists stop retry
        if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    } catch (Exception e) {
        LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
    }
    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}

5 最佳实践

  • 适用场景
    业务流程长、业务流程多

  • 优势
    一阶段提交本地事务,无锁,高性能。
    事件驱动架构,参与者可异步执行,高吞吐。
    补偿服务易于实现。

  • 缺点
    不保证隔离性

  • 注意
    因为需要自己实现正向服务和逆向补偿服务,所以TCC模式遇到的问题,此模式一样存在,即:

  • 允许空补偿

  • 防悬挂控制

  • 幂等控制

6 参考文档

Seata Saga模式

分布式事务 Seata 及其三种模式详解 | Meetup#3 回顾

Seata 原理

Seata-AT模式 原理

Seata-TCC模式 原理

Seata-Saga模式 原理

Seata-XA模式 原理

TCC-Transaction原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/191742.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Hook 技术「建议收藏」

    Hook 技术「建议收藏」一、原理钩子(Hook),是Windows消息处理机制的一个平台,应用程序可以在上面设置子程以监视指定窗口的某种消息,而且所监视的窗口可以是其他进程所创建的。当消息到达后,在目标窗口处理函数之前处理它。钩子机制允许应用程序截获处理window消息或特定事件。  钩子实际上是一个处理消息的程序段,通过系统调用,把它挂入系统。每当特定的消息发出,在没有到达目的窗口前,钩子程序就先捕获该消息

  • vim编辑器,可以实现保存退出()_vim进入编辑模式如何保存并退出

    vim编辑器,可以实现保存退出()_vim进入编辑模式如何保存并退出目录1.Vim模式2.在Vim/Vi中打开文件3.在Vim/Vi中保存文件4.保存文件并退出Vim/Vi5.退出Vim/Vi而不保存文件1.Vim模式启动Vim编辑器时,您处于正常模式。在这种模式下,您可以使用vim命令并浏览文件。要输入文字,您需要按i键进入插入模式。使用此模式,您可以像在常规文本编辑器中一样插入和删除字符。要从其他任何模式返回正常模式,只需按Esc键。2.在Vim/Vi中打开文件要使用Vim打开文件,请键入vim,然后输入要编辑或创建的文件的.

  • java环境配置——cmd命令行输入java正常显示而javac则显示不是内部或者外部命令[通俗易懂]

    java环境配置——cmd命令行输入java正常显示而javac则显示不是内部或者外部命令[通俗易懂]重装系统之后第二发,配置pycharm,意料之外的错误,启动的时候竟然要求配置jdk,瞬时蒙蔽,不记得之前有这么一出啊,上图:于是乎就去官网下载了jdk:找到适合自己的版本,我选的最后一个windows64位的,下载好之后直接安装就行,貌似这里不会有啥问题:安装完成需要在配置环境变量,这样系统才能找到我们的java命令,具体的原理大家可以搜一下,网上讲的很清楚~~环境变…

  • 使用JWT实现单点登录(完全跨域方案)

    首先介绍一下什么是JSONWebToken(JWT)?官方文档是这样解释的:JSONWebToken(JWT)是一个开放标准(RFC7519),它定义了一种紧凑且独立的方式,可以在各方之间作为JSON对象安全地传输信息。此信息可以通过数字签名进行验证和信任。JWT可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对进行签名。虽然JWT可以加密以在各方之间提供保密…

  • golang 2021激活码【2021免费激活】

    (golang 2021激活码)2021最新分享一个能用的的激活码出来,希望能帮到需要激活的朋友。目前这个是能用的,但是用的人多了之后也会失效,会不定时更新的,大家持续关注此网站~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html…

  • mysql图形化工具使用教程_mysql图形化管理工具介绍

    mysql图形化工具使用教程_mysql图形化管理工具介绍MySQL有许多图形化的管理工具,我们在此介绍二个官方的工具「MySQLAdministrator」及「MySQLQueryBrowser」。MySQLAdministrator是用来管理MySQLServer用的,您可以查看目前系统状态、新增使用者等。而MySQLQueryBrowser可以用来查看数据库内容。我们可以在一台Windows的机器上使用图形化的管理工具…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号