datax(9):Job和TaskGroup的通讯机制

datax(9):Job和TaskGroup的通讯机制先后看完了TaskGroupContainer和JobContainer,梳理下他们的关系与职责;一,各自职责JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTrackerTaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker(Yarn中的JobTracker和Yarn中的TaskTracker通过RPC进行通讯);二.

大家好,又见面了,我是你们的朋友全栈君。

先后看完了TaskGroupContainer 和 JobContainer,梳理下他们的关系与职责;


一、各自职责

  1. JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker

  2. TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker(Yarn中的JobTracker和Yarn中的TaskTracker通过RPC进行通讯);


二、相互关系

  1. TaskGroupContainer向JobContainer上报信息,称JobContainer是TaskGroupContainer的上级;
  2. Task向TaskGroupContainer上报信息,称TaskGroupContainer是Task的上级。

在这里插入图片描述


三、TaskGroupContainer向JobContianer汇报过程

汇报源码逻辑是在TaskGroupContainer#reportTaskGroupCommunication这个方法中,方法的两个形参分别为lastTaskGroupContainerCommunication为上次汇报的信息,每次做数据统计的时候需要将当前communication的数据和lastTaskGroupContainerCommunication进行合并;taskCount为该TaskGroup的所有的任务数。

  1. 收集当前TaskGroupContainer对应所有Task的的communication,然后将其合并成一个communication。
    具体合并代码为步骤1,主要逻辑是在Communication#mergeFrom,它的主要功能将两communication的变量合并。主要关注下两个communication的状态合并,可以看到只要该TaskGroup中有一个Task的状态是FAILED或者KILLED就会将整个TaskGroup的状态标记为FAILED,当且仅当所有的任务的状态是SUCCEEDED,该TaskGroup的状态才能标记为SUCCEEDED。

  2. 生成新的reportCommunication作为该TaskGroupContainer上报给JobContianer的communication,
    主要是生成一些技术统计,比方说当前已经导入的记录数和字节数等。

  3. 上报给JobContianer,主要代码见步骤2,将该TaskGroupContainer最新的communication更新到StandAloneJobContainerCommunicator 能够够的得到的地方,即全局变量LocalTGCommunicationManager#taskGroupCommunicationMap中。


  private Communication reportTaskGroupCommunication(Communication lastTGContainerComm,int taskCnt) { 
   
    Communication nowTGContainerComm = this.containerCommunicator.collect();
    nowTGContainerComm.setTimestamp(System.currentTimeMillis());
    Communication reportComm = CommunicationTool
        .getReportCommunication(nowTGContainerComm, lastTGContainerComm, taskCnt);
    this.containerCommunicator.report(reportComm);
    return reportComm;
  }

步骤1


//AbstractCollector#collectFromTask
public Communication collectFromTask() { 
   
    Communication communication = new Communication();
    communication.setState(State.SUCCEEDED);

    for (Communication taskCommunication :
            this.taskCommunicationMap.values()) { 
   
    communication.mergeFrom(taskCommunication);
    }
    return communication;
}
Communication#mergeStateFrom
public synchronized State mergeStateFrom(final Communication otherComm) { 
   
        State retState = this.getState();
        if (otherComm == null) { 
   
            return retState;
        }

        if (this.state == State.FAILED || otherComm.getState() == State.FAILED
                || this.state == State.KILLED || otherComm.getState() == State.KILLED) { 
   
            retState = State.FAILED;
        } else if (this.state.isRunning() || otherComm.state.isRunning()) { 
   
            retState = State.RUNNING;
        }

        this.setState(retState);
        return retState;
}

步骤2


// StandaloneTGContainerCommunicator#report
public void report(Communication communication) { 
   
    super.getReporter().reportTGCommunication(super.taskGroupId, communication);
}

// ProcessInnerReporter
public void reportTGCommunication(Integer taskGroupId, Communication communication) { 
   
    LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
}

// LocalTGCommunicationManager#updateTaskGroupCommunication
public static void updateTaskGroupCommunication(final int taskGroupId,
                                                    final Communication communication) { 
   
        Validate.isTrue(taskGroupCommunicationMap.containsKey(
                taskGroupId), String.format("taskGroupCommunicationMap中没有注册taskGroupId[%d]的Communication," +
                "无法更新该taskGroup的信息", taskGroupId));
        taskGroupCommunicationMap.put(taskGroupId, communication);
}


四、TaskGroupContainer向JobContianer汇报时机

TaskGroupContainer向JobContainer汇报该TaskGroupContainer的执行情况的时机均在TaskGroupContainer#start中。

1、当前TaskGroup中有状态为FAILED或者KILLED的Task

如果一个Task只能执行一次(默认是1次,没有做重试)且该Task被标记为FAILED或者KILLED,马上将failedOrKilled这个变量标记为true并执行汇报逻辑。这种情况下除了汇报之后,还会抛出一个运行时异常,结束执行当前TaskGroupContainer的线程(TaskGroupContianer是在线程池中执行的)。


if (failedOrKilled) { 
   
    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

    throw DataXException.asDataXException(
        FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}

2、上次失败的Task仍未结束

如果一个Task标记为FAILED或者KILLED,但是有重试逻辑就不会执行上面第1步的逻辑,而是会调用当前的Task对应TaskExecutor#shutdown,关闭当前的TaskExecutor。在调用TaskExecutor#shutdown一段时间发发现给TaskExecutor还没有关闭,触发下面逻辑,进行汇报的同时抛出异常。

if(now - failedTime > taskMaxWaitInMsec){ 
   
    markCommunicationFailed(taskId);
    reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
}

3、TaskGroupContainer任务列表为空,所有任务都是成功执行, 搜集状态为SUCCEEDED

这个没什么好说的,该TaskGroup中所有的任务执行成功,该Job执行成功。

4、如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报

可以理解为心跳了

5、TaskGroupContainer所在的线程正常结束时汇报一次

这个真没什么好说的了


五、JobContainer收到汇报之后的处理

JobContainer的处理逻辑是在dataX所在JVM的主线程中,具体是在AbstractScheduler#schedule中。

  1. 每隔一段时间,合并所有TaskGoupContianer汇报的信息,具体合并的逻辑和TaskGoupContianer合并Task的汇报信息差不多;

  2. 正常结束就正常退出;

  3. 处理isJobKilling,StandAloneScheduler并没有提供kill接口,咱不管;

  4. 重点关注下FAILED的逻辑,直接关闭当前Scheduler的线程池并在主线程中抛出异常,整个dataX进程退出。

// AbstractScheduler#schedule
public void schedule(List<Configuration> configurations) { 
   
        ...
        ...
        Communication lastJobContainerCommunication = new Communication();

        long lastReportTimeStamp = System.currentTimeMillis();
        try { 
   
            while (true) { 
   
                Communication nowJobContainerCommunication = this.containerCommunicator.collect();
                nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
                LOG.debug(nowJobContainerCommunication.toString());

                //汇报周期
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > jobReportIntervalInMillSec) { 
   
                    Communication reportCommunication = CommunicationTool
                            .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);

                    this.containerCommunicator.report(reportCommunication);
                    lastReportTimeStamp = now;
                    lastJobContainerCommunication = nowJobContainerCommunication;
                }

                errorLimit.checkRecordLimit(nowJobContainerCommunication);

                if (nowJobContainerCommunication.getState() == State.SUCCEEDED) { 
   
                    LOG.info("Scheduler accomplished all tasks.");
                    break;
                }

                if (isJobKilling(this.getJobId())) { 
   
                    dealKillingStat(this.containerCommunicator, totalTasks);
                } else if (nowJobContainerCommunication.getState() == State.FAILED) { 
   
                    dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
                }

                Thread.sleep(jobSleepIntervalInMillSec);
            }
        } catch (InterruptedException e) { 
   
            // 以 failed 状态退出
            LOG.error("捕获到InterruptedException异常!", e);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }

    }
    
    // ProcessInnerScheduler#dealFailedStat
    public void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) { 
   
        this.taskGroupContainerExecutorService.shutdownNow();
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, throwable);
    }


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145686.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • SpringBoot 自动配置原理[通俗易懂]

    SpringBoot 自动配置原理[通俗易懂]创建项目通过SpringInitialize创建SpringBoot项目而接下来要说的是关于配置文件的事情。关乎配置文件可以参考官方文档。对于配置文件来说到底在配置文件里面可以进行配置那些内容,自动配置的原理又是什么东西呢?自动配置原理在SpringBoot启动的时候加载主配置类,开启了自动配置的功能,通过@EnableAutoConfiguration注解开启自动配置的功能。@Im…

  • ❤️肝下25万字的《决战Linux到精通》笔记,你的Linux水平将从入门到入魔❤️【建议收藏】

    ❤️肝下25万字的《决战Linux到精通》笔记,你的Linux水平将从入门到入魔❤️【建议收藏】文章目录操作系统的发展史UnixMinixLinux操作系统的发展Minix没有火起来的原因Linux介绍Linux内核&发行版Linux内核版本Linux发行版本类Unix系统目录结构Linux目录用户目录命令行基本操作命令使用方法查看帮助文档helpman(manual)tab键自动补全history游览历史命令行中的ctrl组合键Linux命令权限管理列出目录的内容:ls显示inode的内容:stat文件访问权限修改文件权限:chmod修改文件所有者:chown修改文件所属组:chgrp文件.

  • Ajax清晰请求步骤与代码

    Ajax清晰请求步骤与代码异步请求ajax的使用在前后台传递数据,优化用户体验起着至关重要的角色,那么下面给大家简单罗列了一下ajax请求的步骤与代码。一、原生JS中的Ajax:1、使用ajax发送数据的步骤第一步:创建异步对象varxhr=newXMLHttpRequest();第二步:设置请求行open(请求方式,请求url)://get请求如果有参数就需要在…

  • 忆贵州三年的教书编程岁月:不弛于空想,不骛于虚声「建议收藏」

    忆贵州三年的教书编程岁月:不弛于空想,不骛于虚声「建议收藏」回首,2016年7月他离开北京回到了家乡贵州,成为了贵州财经大学的一名青年教师。转眼,2019年7月他迎来了人生的第三张通知书,即将辗转第三个城市,开始新的征途。教书三年,讲台前的每一次分享都值得回味,学生的每一句“老师好”,每一个问候和祝福,都留下了深刻的印象。

  • pycharm最新激活码2021【2021.7最新】

    (pycharm最新激活码2021)JetBrains旗下有多款编译器工具(如:IntelliJ、WebStorm、PyCharm等)在各编程领域几乎都占据了垄断地位。建立在开源IntelliJ平台之上,过去15年以来,JetBrains一直在不断发展和完善这个平台。这个平台可以针对您的开发工作流进行微调并且能够提供…

  • WebStorm 2021.1 使用 ESLint自动格式化代码[通俗易懂]

    WebStorm 2021.1 使用 ESLint自动格式化代码[通俗易懂]引言WebStorm不能像VSCode那样在保存的时候自动Fix-ESLint,不能自动格式化代码,需要安装一个插件安装ESLint插件进入设置快捷键win:Ctrl+Alt+Smac:command+,找到Plugins,搜索eslint安装后配置一下配置搜索eslint…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号