Hmily(3)

Hmily(3)5.提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactorySe…

大家好,又见面了,我是你们的朋友全栈君。

5. 提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactoryServiceImpl#factoryOf即ParticipantHmilyTransactionHandler

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
        HmilyTransaction hmilyTransaction = null;
        HmilyTransaction currentTransaction;
        switch (HmilyActionEnum.acquireByCode(context.getAction())) {
            case TRYING:
                try {
                    hmilyTransaction = hmilyTransactionExecutor.preTryParticipant(context, point);
                    final Object proceed = point.proceed();
                    hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
                    //update log status to try
                    hmilyTransactionExecutor.updateStatus(hmilyTransaction);
                    return proceed;
                } catch (Throwable throwable) {
                    //if exception ,delete log.
                    hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
                    throw throwable;
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            case CONFIRMING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.confirm(currentTransaction);
                break;
            case CANCELING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.cancel(currentTransaction);
                break;
            default:
                break;
        }
        Method method = ((MethodSignature) (point.getSignature())).getMethod();
        return DefaultValueUtils.getDefaultValue(method.getReturnType());
    }

刚开始时TRYING,参与者执行tey方法,构建HmilyTransaction,并且保存在Guava内存缓存中。然后发布保存事件保存在本服务所在的数据库中。最后保存上下文到ThreadLocal中返回。执行本地的业务方法,最后更新事务状态,清除ThreadLocal返回。

 public HmilyTransaction preTryParticipant(final HmilyTransactionContext context, final ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, "participant hmily transaction start..:{}", context::toString);
        final HmilyTransaction hmilyTransaction = buildHmilyTransaction(point, HmilyRoleEnum.PROVIDER.getCode(), context.getTransId());
        //cache by guava
        HmilyTransactionGuavaCacheManager.getInstance().cacheHmilyTransaction(hmilyTransaction);
        //publishEvent
        hmilyTransactionEventPublisher.publishEvent(hmilyTransaction, EventTypeEnum.SAVE.getCode());
        //Nested transaction support
        context.setRole(HmilyRoleEnum.LOCAL.getCode());
        HmilyTransactionContextLocal.getInstance().set(context);
        return hmilyTransaction;
    }

6. 继续回到消费方的StarterHmilyTransactionHandler,因为远程的rpc已经调用完毕。也就是returnValue = point.proceed();执行完毕,更新本地事务状态为TRYING(1, “try阶段完成”),开始执行确认操作,如果发生异常则执行取消操作,两者类似,只是操作不一样。都是通过线程池来异步执行,HmilyTransactionExecutor#confirm,更新本地事务状态,因为在每次在执行dubbo的filter的时候都会把参与方的信息记录下来,即updateParticipant,所以这块就用反射操作调用该确认方法,这个也是个RPC调用,之前的流程也会再来一遍。请求到达提供端,首先从HmilyTransactionGuavaCacheManager中获取事务信息,如果没有的话就会从数据库中查询。最后执行相应的操作。

 public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
        LogUtil.debug(LOGGER, () -> "hmily transaction confirm .......!start");
        if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
            return;
        }
        currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
        updateStatus(currentTransaction);
        final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
        List<HmilyParticipant> failList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
        boolean success = true;
        if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
            for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
                try {
                    HmilyTransactionContext context = new HmilyTransactionContext();
                    context.setAction(HmilyActionEnum.CONFIRMING.getCode());
                    context.setRole(HmilyRoleEnum.START.getCode());
                    context.setTransId(hmilyParticipant.getTransId());
                    HmilyTransactionContextLocal.getInstance().set(context);
                    executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
                } catch (Exception e) {
                    LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
                    success = false;
                    failList.add(hmilyParticipant);
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            }
            executeHandler(success, currentTransaction, failList);
        }
    }

7. 如果出现非一致性异常的话,需要保证事务的事务的最后一致性,通过HmilyTransactionSelfRecoveryScheduled定时程序来实现,获取延迟多长时间后的事务信息,只要为了防止并发的时候,刚新增的数据被执行.判断事务信息的角色,如果是提供者并且状态是try刚开始的话,说明本地事务都执行失败等,也不会影响消费方,直接删除日志即可,判断重试次数是否达到上限,判断分布式事务模式为TCC还是CC,如果事务角色是提供者,重试只能由消费执行。最后更新重试次数,继续执行确认或者取消方法。HmilyTransactionRecoveryService具体方法和HmilyTransactionExecutor类似

public void onApplicationEvent(final ContextRefreshedEvent event) {
        hmilyCoordinatorRepository = SpringBeanUtils.getInstance().getBean(HmilyCoordinatorRepository.class);
        this.scheduledExecutorService =
                new ScheduledThreadPoolExecutor(1,
                        HmilyThreadFactory.create("hmily-transaction-self-recovery", true));
        hmilyTransactionRecoveryService = new HmilyTransactionRecoveryService(hmilyCoordinatorRepository);
        selfRecovery();
    }

 /**
     * if have some exception by schedule execute hmily transaction log.
     */
    private void selfRecovery() {
        scheduledExecutorService
                .scheduleWithFixedDelay(() -> {
                    LogUtil.info(LOGGER, "self recovery execute delayTime:{}", () -> hmilyConfig.getScheduledDelay());
                    try {
                        final List<HmilyTransaction> hmilyTransactions = hmilyCoordinatorRepository.listAllByDelay(acquireData());
                        if (CollectionUtils.isEmpty(hmilyTransactions)) {
                            return;
                        }
                        for (HmilyTransaction hmilyTransaction : hmilyTransactions) {
                            // if the try is not completed, no compensation will be provided (to prevent various exceptions in the try phase)
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()) {
                                hmilyCoordinatorRepository.remove(hmilyTransaction.getTransId());
                                continue;
                            }
                            if (hmilyTransaction.getRetriedCount() > hmilyConfig.getRetryMax()) {
                                LogUtil.error(LOGGER, "This transaction exceeds the maximum number of retries and no retries will occur:{}", () -> hmilyTransaction);
                                continue;
                            }
                            if (Objects.equals(hmilyTransaction.getPattern(), PatternEnum.CC.getCode())
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()) {
                                continue;
                            }
                            // if the transaction role is the provider, and the number of retries in the scope class cannot be executed, only by the initiator
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && (hmilyTransaction.getCreateTime().getTime()
                                    + hmilyConfig.getRecoverDelayTime() * hmilyConfig.getLoadFactor() * 1000
                                    > System.currentTimeMillis())) {
                                continue;
                            }
                            try {
                                hmilyTransaction.setRetriedCount(hmilyTransaction.getRetriedCount() + 1);
                                final int rows = hmilyCoordinatorRepository.update(hmilyTransaction);
                                // determine that rows>0 is executed to prevent concurrency when the business side is in cluster mode
                                if (rows > 0) {
                                    if (hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.CANCELING.getCode()) {
                                        hmilyTransactionRecoveryService.cancel(hmilyTransaction);
                                    } else if (hmilyTransaction.getStatus() == HmilyActionEnum.CONFIRMING.getCode()) {
                                        hmilyTransactionRecoveryService.confirm(hmilyTransaction);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                LogUtil.error(LOGGER, "execute recover exception:{}", e::getMessage);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledDelay(), TimeUnit.SECONDS);

    }

    private Date acquireData() {
        return new Date(LocalDateTime.now().atZone(ZoneId.systemDefault())
                .toInstant().toEpochMilli() - (hmilyConfig.getRecoverDelayTime() * 1000));
    }

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/143204.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号