大家好,又见面了,我是你们的朋友全栈君。
datax的jobContainer最终会通过调度周期性的执行,今天把它看完;
一、基类AbstractScheduler概述
类继承关系
全部方法
二、AbstractScheduler的主要属性和方法
1、主要属性
/** * 脏数据行数检查器,用于运行中随时检查脏数据是否超过限制(脏数据行数,或脏数据百分比) */
private ErrorRecordChecker errorLimit;
/** * 积累容器通讯器,来处理JobContainer、TaskGroupContainer和Task的通讯 */
private AbstractContainerCommunicator containerCommunicator;
2、主要方法
/** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
public void schedule(List<Configuration> cfg) {
xxx
}
/** * 开始所有的taskGroup,只允许本包的类访问 * * @param configurations List<Configuration> */
protected abstract void startAllTaskGroup(List<Configuration> configurations);
三、谁调用AbstractScheduler的schedule
从JobContainer.schedule调用AbstractScheduler.schedule
四、schedule和startAllTaskGroup方法解析
schedule方法主要在AbstractScheduler实现
运行时序图
/** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
public void schedule(List<Configuration> cfg) {
Validate.notNull(cfg, "scheduler配置不能为空");
int reportMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000);
int sleepMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000);
this.jobId = cfg.get(0).getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
errorLimit = new ErrorRecordChecker(cfg.get(0));
//给 taskGroupContainer 的 Communication 注册
this.containerCommunicator.registerCommunication(cfg);
int taskCnt = calculateTaskCount(cfg);
startAllTaskGroup(cfg);
Communication lastComm = new Communication();
long lastReportTimeStamp = System.currentTimeMillis();
try {
while (true) {
/** * step 1: collect job stat * step 2: getReport info, then report it * step 3: errorLimit do check * step 4: dealSucceedStat(); * step 5: dealKillingStat(); * step 6: dealFailedStat(); * step 7: refresh last job stat, and then sleep for next while * * above steps, some ones should report info to DS * */
Communication nowComm = this.containerCommunicator.collect();
nowComm.setTimestamp(System.currentTimeMillis());
LOG.debug(nowComm.toString());
//汇报周期
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportMillSec) {
Communication comm = CommunicationTool.getReportCommunication(nowComm, lastComm, taskCnt);
this.containerCommunicator.report(comm);
lastReportTimeStamp = now;
lastComm = nowComm;
}
errorLimit.checkRecordLimit(nowComm);
if (nowComm.getState() == State.SUCCEEDED) {
LOG.info("Scheduler accomplished all tasks.");
break;
}
if (isJobKilling(this.getJobId())) {
dealKillingStat(this.containerCommunicator, taskCnt);
} else if (nowComm.getState() == State.FAILED) {
dealFailedStat(this.containerCommunicator, nowComm.getThrowable());
}
Thread.sleep(sleepMillSec);
}
} catch (InterruptedException e) {
// 以 failed 状态退出
LOG.error("捕获到InterruptedException异常!", e);
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
}
}
startAllTaskGroup方法在ProcessInnerScheduler实现
运行时序图
/** * 1、创建线程池 <br/> * 2、变量传入的cfgs,生成tgRunner,然后线程池执行 <br/> * 3、线程池关闭 <br/> * * @param cfgs List<Configuration> */
@Override
public void startAllTaskGroup(List<Configuration> cfgs) {
this.taskGroupContainerExecutorService = new ThreadPoolExecutor(cfgs.size(), cfgs.size(),
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (Configuration taskGroupCfg : cfgs) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupCfg);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
注:
-
对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
-
所有代码都已经上传到github(master分支和dev),可以免费白嫖
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145542.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...