datax(6):启动步骤解析

datax(6):启动步骤解析通过前面datax(2):通过idea搭建源码阅读+调试环境已经知道了idea下阅读源码的步骤,现在看下DataX启动步骤解析一,启动java类(主入口)/***Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑*/com.alibaba.datax.core.Engine二,启动的步骤1、解析配置,包括job.json、core.json、plugin.json三个配置2、设置jobId到config.

大家好,又见面了,我是你们的朋友全栈君。

通过前面 datax(2): 通过idea搭建源码阅读+调试环境 已经知道了idea下阅读源码的步骤,现在看下 DataX启动步骤解析


一、启动java类(主入口)

/**
 * Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
 */
com.alibaba.datax.core.Engine

二、启动的步骤


1、解析用户输入的参数: job(datax的json),jobId(默认-1,),running_mode

2、配置额外的参数,打印vm的信息,打印过滤的配置信息(过滤敏感字符),校验配置

3、配置conf传入Engine.start(),启动程序

4、绑定字段信息,初始化插件加载器

5、判断任务类型(taskGroup还是job),生成不同的container(JobContainer或TaskGroupContainer)

6、打开各种追踪器,报告器,用于任务的运行状况收集

7、container.start() 对应的容器开始运行任务

Datax的执行过程
在这里插入图片描述

过程详细说明如下:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

简单总结过程如下:

一个DataX Job会切分成多个Task,每个Task会按TaskGroup进行分组,一个Task内部会有一组Reader->Channel->Writer。Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输


三、启动时序图

在这里插入图片描述


四、主要方法

在这里插入图片描述


五、代码详细

package com.alibaba.datax.core;
**
* Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
*/
public class Engine { 

private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
private static String RUNTIME_MODE;
/**
* 真正开始执行任务的地方 check job model (job/task) first
*
* @param allConf Configuration
*/
public void start(Configuration allConf) { 

// 绑定column转换信息
ColumnCast.bind(allConf);
//初始化PluginLoader,可以获取各种插件配置
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup"
.equalsIgnoreCase(allConf.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber = 0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
if (isJob) { 

allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else { 

container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}
//缺省打开perfTrace
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standalone 模式的 datax shell任务不进行汇报
if (instanceId == -1) { 

perfReportEnable = false;
}
int priority = 0;
try { 

priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
} catch (NumberFormatException e) { 

LOG.warn("priority set to 0, because NumberFormatException, the value is: {}",
System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace
.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
container.start();
}
/**
* 过滤job配置信息
*
* @param configuration Configuration
* @return String
*/
public static String filterJobConfiguration(final Configuration configuration) { 

Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
Configuration jobContent = jobConfWithSetting.getConfiguration("content");
jobConfWithSetting.set("content", filterSensitiveConfiguration(jobContent));
return jobConfWithSetting.beautify();
}
/**
* 屏蔽敏感信息
*
* @param conf Configuration
* @return Configuration
*/
public static Configuration filterSensitiveConfiguration(Configuration conf) { 

Set<String> keys = conf.getKeys();
for (final String key : keys) { 

boolean isSensitive =
endsWithIgnoreCase(key, "password") || endsWithIgnoreCase(key, "accessKey");
if (isSensitive && conf.get(key) instanceof String) { 

conf.set(key, conf.getString(key).replaceAll(".", "*"));
}
}
return conf;
}
/**
* @param args String[]
* @throws Throwable
*/
public static void entry(final String[] args) throws Throwable { 

Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration conf = ConfigParser.parse(jobPath);
long jobId;
String defaultJobId = "-1";
if (!defaultJobId.equals(jobIdString)) { 

//  如果jobId相同,会怎样?
jobId = Long.parseLong(jobIdString);
} else { 

// 如果用户没有指定jobId,或jobId==1,执行后面逻辑
// only for dsc & ds & datax 3 update
String dscJobUrlPatternStr = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternStr = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternStr = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patterns = Arrays
.asList(dscJobUrlPatternStr, dsJobUrlPatternStr, dsTaskGroupUrlPatternStr);
jobId = parseJobIdFromUrl(patterns, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) { 

// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
conf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) { 

LOG.info(vmInfo.toString());
}
LOG.info("\n" + filterJobConfiguration(conf) + "\n");
LOG.debug(conf.toJSON());
ConfigurationValidate.doValidate(conf);
Engine engine = new Engine();
engine.start(conf);
}
/**
* -1 表示未能解析到 jobId
* <p>
* only for dsc & ds & datax 3 update
*/
private static long parseJobIdFromUrl(List<String> patternStringList, String url) { 

long result = -1;
for (String patternString : patternStringList) { 

result = doParseJobIdFromUrl(patternString, url);
if (result != -1) { 

return result;
}
}
return result;
}
private static long doParseJobIdFromUrl(String patternString, String url) { 

Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(url);
if (matcher.find()) { 

return Long.parseLong(matcher.group(1));
}
return -1;
}
public static void main(String[] args) { 

int exitCode = 0;
try { 

Engine.entry(args);
} catch (Throwable e) { 

exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n {}", ExceptionTracker.trace(e));
if (e instanceof DataXException) { 

DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) { 

FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}
System.exit(exitCode);
}
System.exit(exitCode);
}
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145463.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • Python常用数组操作函数

    Python常用数组操作函数Python常用数组操作函数1.添加数组元素:列表:list=[‘sadsad’,1,‘哈哈’,‘是否’]append():向末尾添加一个内容如:list.append(‘我是最后一个’)输出:[‘sadsad’,1,‘哈哈’,‘是否’,‘我是最后一个’]extend():向末尾添加多个内容如:list.extend([‘大毛’,‘二毛’])输出:[‘sadsad’,1,…

  • Java Object转JSONObject[通俗易懂]

    Java Object转JSONObject[通俗易懂]JSONObjectjson=(JSONObject)JSONObject.toJSON(list.get(0));

  • OpenCV人脸识别的原理 .

    OpenCV人脸识别的原理 .在之前讲到的人脸测试后,提取出人脸来,并且保存下来,以供训练或识别是用,提取人脸的代码如下:voidGetImageRect(IplImage*orgImage,CvRectrectInImage,IplImage*imgRect,doublescale){ //从图像orgImage中提取一块(rectInImage)子图像imgRect IplImage*res

  • 5分钟商学院之个人篇–时间管理和学习能力

    1.时间管理1.1时间成本时间成本,就是如果把这个时间用于做别的事情,可以获得的收益懂得计算时间成本,可以帮助我们做出理性决策,比如一件事是自己做合适还是花钱请别人做合适1.2GTD(G

    2021年12月30日
  • tar打包绝对路径文件[通俗易懂]

    tar打包绝对路径文件[通俗易懂]当使用tar打包绝对路径文件时会警告:tar:Removingleading`/’frommembernames[user_00@CoalaaHK1~]$tar-zcvftest2.tar.gz/home/user_00/wade/testtar:Removingleading`/’frommembernames/home/user_00/wade/test/…

  • 腾讯面试题目汇总[通俗易懂]

    腾讯面试题目汇总面试官提问1:自我介绍及项目经历关于这道题,每个人的项目经历都不太一样,所以各位朋友根据自己的实际情况来介绍吧,在这里就不多介绍了。面试官提问2:看你项目介绍中大量使用了Redis,那能不能介绍下Redis的主从同步机制呢?关于这道题,因为我在之前的文章也分析过Redis主从同步的机制,所以我从完整重同步和部分重同步两个阶段去分析的,结果也得到了面试官的认可。详细的完整重同步和部分重同步机制原理是什么样的,在这里就不展开介绍了,附上链接朋友们自行查…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号