datax(12):调度源码解读AbstractScheduler「建议收藏」

datax(12):调度源码解读AbstractScheduler「建议收藏」datax的jobContainer最终会通过调度周期性的执行,今天把它看完;一、基类AbstractScheduler概述类继承关系全部方法二、AbstractScheduler的主要属性和方法1、主要属性/***脏数据行数检查器,用于运行中随时检查脏数据是否超过限制(脏数据行数,或脏数据百分比)*/privateErrorRecordCheckererrorLimit;/***积累容器通讯器,来处理JobContainer、Tas.

大家好,又见面了,我是你们的朋友全栈君。

datax的jobContainer最终会通过调度周期性的执行,今天把它看完;


一、基类AbstractScheduler概述

类继承关系
在这里插入图片描述

全部方法
在这里插入图片描述


二、AbstractScheduler的主要属性和方法

1、主要属性

  /** * 脏数据行数检查器,用于运行中随时检查脏数据是否超过限制(脏数据行数,或脏数据百分比) */
  private ErrorRecordChecker errorLimit;

  /** * 积累容器通讯器,来处理JobContainer、TaskGroupContainer和Task的通讯 */
  private AbstractContainerCommunicator containerCommunicator;

2、主要方法

  /** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
  public void schedule(List<Configuration> cfg) { 
   
    xxx
}
  /** * 开始所有的taskGroup,只允许本包的类访问 * * @param configurations List<Configuration> */
  protected abstract void startAllTaskGroup(List<Configuration> configurations);
 

三、谁调用AbstractScheduler的schedule

从JobContainer.schedule调用AbstractScheduler.schedule


四、schedule和startAllTaskGroup方法解析

schedule方法主要在AbstractScheduler实现
运行时序图

在这里插入图片描述

  /** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
public void schedule(List<Configuration> cfg) { 

Validate.notNull(cfg, "scheduler配置不能为空");
int reportMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000);
int sleepMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000);
this.jobId = cfg.get(0).getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
errorLimit = new ErrorRecordChecker(cfg.get(0));
//给 taskGroupContainer 的 Communication 注册
this.containerCommunicator.registerCommunication(cfg);
int taskCnt = calculateTaskCount(cfg);
startAllTaskGroup(cfg);
Communication lastComm = new Communication();
long lastReportTimeStamp = System.currentTimeMillis();
try { 

while (true) { 

/** * step 1: collect job stat * step 2: getReport info, then report it * step 3: errorLimit do check * step 4: dealSucceedStat(); * step 5: dealKillingStat(); * step 6: dealFailedStat(); * step 7: refresh last job stat, and then sleep for next while * * above steps, some ones should report info to DS * */
Communication nowComm = this.containerCommunicator.collect();
nowComm.setTimestamp(System.currentTimeMillis());
LOG.debug(nowComm.toString());
//汇报周期
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportMillSec) { 

Communication comm = CommunicationTool.getReportCommunication(nowComm, lastComm, taskCnt);
this.containerCommunicator.report(comm);
lastReportTimeStamp = now;
lastComm = nowComm;
}
errorLimit.checkRecordLimit(nowComm);
if (nowComm.getState() == State.SUCCEEDED) { 

LOG.info("Scheduler accomplished all tasks.");
break;
}
if (isJobKilling(this.getJobId())) { 

dealKillingStat(this.containerCommunicator, taskCnt);
} else if (nowComm.getState() == State.FAILED) { 

dealFailedStat(this.containerCommunicator, nowComm.getThrowable());
}
Thread.sleep(sleepMillSec);
}
} catch (InterruptedException e) { 

// 以 failed 状态退出
LOG.error("捕获到InterruptedException异常!", e);
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

startAllTaskGroup方法在ProcessInnerScheduler实现
运行时序图

在这里插入图片描述


/** * 1、创建线程池 <br/> * 2、变量传入的cfgs,生成tgRunner,然后线程池执行 <br/> * 3、线程池关闭 <br/> * * @param cfgs List<Configuration> */
@Override
public void startAllTaskGroup(List<Configuration> cfgs) { 

this.taskGroupContainerExecutorService = new ThreadPoolExecutor(cfgs.size(), cfgs.size(),
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (Configuration taskGroupCfg : cfgs) { 

TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupCfg);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145542.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • jquery validate验证方法

    jquery validate验证方法

  • linux清除隐藏的挖矿程序

    linux清除隐藏的挖矿程序1.找出cpu高的程序,top找不到的话,用下面命令ps-aux–sort=-pcpu|head-102.杀掉相关进程kill-9pid3.查看crontab是否有定时任务4.删除相关命令[root@dbserverlib]#lsattrlibiacpkmn.so.3—-i——–e–libiacpkmn.so.3[root@dbserverlib]#chattr-ilibiacpkmn.so.3[root@dbserver

  • SQLServer中的死锁的介绍

    SQLServer中的死锁的介绍

    2021年11月26日
  • af-s af-f_read fpdma queued

    af-s af-f_read fpdma queued1.PDAF驱动功能验证1.1pdaflog设置log设置,打开AF(8),State(2),Sensor(2)的logCT30P:/#cat/vendor/etc/camera/camxoverridesettings.txtoverrideLogLevels=0x3FlogWarningMask=0x08000202logInfoMask=0x08000202logVerboseMask=0x08000202enable3ADebugData=TRUEdumpSens

  • Python实现自动回复_python 微信机器人

    Python实现自动回复_python 微信机器人一简单介绍wxpy基于itchat,使用了Web微信的通讯协议,,通过大量接口优化提升了模块的易用性,并进行丰富的功能扩展。实现了微信登录、收发消息、搜索好友、数据统计等功能。总而言之,可用来实现各种微信个人号的自动化操作。(http://wxpy.readthedocs.io/zh/latest/bot.html)安装:wxpy支持Python3.4-3.6,以及2.7版本pip…

  • 建立数据库的方法有哪些_数据库应用原则

    建立数据库的方法有哪些_数据库应用原则1:需求分析好后,找实体,不要有所落下2:分析实体里所要涉及到的属性(比如学生,在这个数据库里我需要他的学号,但不需要他的籍贯)3:分析属性的类型,长度。要想的长远点,比如学生名字的长度,汉族一般几位就够了,可万一来个维族,那几位长度就不够用了。4:实体间的关系,要满足需求和现实5:写出关系模式6:优化关系模式(越满足后面的范式越好)7:建数据库。属性,约束等最好字母+单词(正确的),

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号