大家好,又见面了,我是你们的朋友全栈君。
现在很多场景都把datax当做ETL工具,datax中的各种reader相当于E(Extract),各种writer相当于L(load),那么datax中是否有T(transform)。答案是肯定的~
一、概述transformer
作用:在生产上数据传输,一般情况下只需要rw就行,但是有时候需要在中间过程做些操作,比如加解密、切割、拼接等等,这个时候就需要transform了。
族谱
datax中的transform有2个顶级祖宗,简单类型的Transformer和复杂类型的ComplexTransformer(后续代码中其实可以将简单类型转为复杂类型);
二、简单类型Transformer
里面只有1个属性+1个方法(get set方法忽略不计)。属性transformerName 是每个transformer的唯一标识符(datax中多以dx_开头命名),方法evaluate是一个抽象方法,主要靠子类实现;
/** * transformerName的唯一性在datax中检查,或者提交到插件中心检查。 */
private String transformerName;
/** * 用于具体的处理逻辑的实现 <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 */
abstract public Record evaluate(Record record, Object... paras);
1、族谱如下,从下图可以看出,目前datax的本地的transform主要有5种(替换replace,截取substr,Groovy,过滤filter,填充pad)
三、复杂类型ComplexTransformer
和简单类型的transformer相比,属性相同,抽象方法里多了了一个参数tContext,主要是做允许的配置项;
/** * transformerName的唯一性在datax中检查,或者提交到插件中心检查。 */
private String transformerName;
public String getTransformerName() {
return transformerName;
}
public void setTransformerName(String transformerName) {
this.transformerName = transformerName;
}
/** * @param record 行记录,UDF进行record的处理后,更新相应的record * @param tContext transformer运行的配置项 * @param paras transformer函数参数 */
abstract public Record evaluate(Record record, Map<String, Object> tContext, Object... paras);
1、族谱如下,从下图可以看出,主要只有一个子类ComplexTransformerProxy
2、子类ComplexTransformerProxy
这里有意思了,看代码,里面一个构造函数,一个重写函数。构造函数可以将简单的transform转为复杂的,同时重写函数里实现上调用的是简单类型的evaluate函数。此处不知是否是因为阿里只开源出部分代码。。。
/** * no comments. * Created by liqiang on 16/3/8. */
public class ComplexTransformerProxy extends ComplexTransformer {
private Transformer realTransformer;
/** * 将简单类型的transform转为复杂类型 * * @param transformer ComplexTransformerProxy */
public ComplexTransformerProxy(Transformer transformer) {
setTransformerName(transformer.getTransformerName());
this.realTransformer = transformer;
}
/** * @param record 行记录,UDF进行record的处理后,更新相应的record * @param tContext transformer运行的配置项 * @param paras transformer函数参数 * @return */
@Override
public Record evaluate(Record record, Map<String, Object> tContext, Object... paras) {
return this.realTransformer.evaluate(record, paras);
}
public Transformer getRealTransformer() {
return realTransformer;
}
}
三、围绕transform的辅助类
- TransformerInfo是transformer的单实例类,主要记录transformer一些信息
/** * function基本信息 */
private ComplexTransformer transformer;
private ClassLoader classLoader;
/** * 是否是本地transform */
private boolean isNative;
- TransformerRegistry 将transform注册的类
核心方法是loadTransformerFromLocalStorage,从本地加载transformer
public class TransformerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
private static Map<String, TransformerInfo> registedTransformer = new HashMap<>();
static {
/** * add native transformer * local storage and from server will be delay load. * 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍) */
registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
}
public static void loadTransformerFromLocalStorage() {
//加载本地存储的 transformer
loadTransformerFromLocalStorage(null);
}
/** * 从本地加载transform(主要是根据transform加载transformer.json) * * @param transformers List<String> transformer文件名列表 */
public static void loadTransformerFromLocalStorage(List<String> transformers) {
String[] files = new File(DATAX_STORAGE_TRANSFORMER_HOME).list();
if (null == files) {
return;
}
for (final String transformerFile : files) {
try {
if (transformers == null || transformers.contains(transformerFile)) {
loadTransformer(transformerFile);
}
} catch (Exception e) {
LOG.error(format("skip transformer(%s) loadTransformer has Exception(%s)",
transformerFile, e.getMessage()), e);
}
}
}
/** * 根据文件名加载transformer <br> * 1 先根据 tf名字找到tf.json <br> * 2 将json加载成cfg <br> * 3 将tf 的jar加载 <br> * 4 将tf注册到map中 <br> * * @param tfFile String transformer的文件名 */
public static void loadTransformer(String tfFile) {
String tfPath = DATAX_STORAGE_TRANSFORMER_HOME + File.separator + tfFile;
Configuration tfCfg;
try {
tfCfg = loadTransFormerConfig(tfPath);
} catch (Exception e) {
String errMsg = format("skip transformer(%s),load transformer.json error,path = %s, ", tfFile,
tfPath);
LOG.error(errMsg, e);
return;
}
String className = tfCfg.getString("class");
if (StringUtils.isEmpty(className)) {
LOG.error(
format("skip transformer(%s),class not config, path = %s, config = %s", tfFile, tfPath,
tfCfg.beautify()));
return;
}
String funName = tfCfg.getString("name");
if (!tfFile.equals(funName)) {
LOG.warn(format(
"transformer(%s) name not match transformer.json config name[%s], will ignore json's name, path = %s, config = %s",
tfFile, funName, tfPath, tfCfg.beautify()));
}
JarLoader jarLoader = new JarLoader(new String[]{
tfPath});
try {
Class<?> transformerClass = jarLoader.loadClass(className);
Object transformer = transformerClass.newInstance();
// 判断tf 是复杂型还是简单型
if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
((ComplexTransformer) transformer).setTransformerName(tfFile);
registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
((Transformer) transformer).setTransformerName(tfFile);
registTransformer((Transformer) transformer, jarLoader, false);
} else {
LOG.error(format("load Transformer class(%s) error, path = %s", className, tfPath));
}
} catch (Exception e) {
//错误 function 跳过
LOG.error(format("skip transformer(%s),load Transformer class error, path = %s ", tfFile,
tfPath), e);
}
}
/** * 根据 transform路径加载transformer.json * * @param transformerPath String * @return Configuration */
private static Configuration loadTransFormerConfig(String transformerPath) {
return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
}
public static TransformerInfo getTransformer(String transformerName) {
TransformerInfo result = registedTransformer.get(transformerName);
//if (result == null) {
//todo 再尝试从disk读取
//}
return result;
}
public static synchronized void registTransformer(Transformer transformer) {
registTransformer(transformer, null, true);
}
public static synchronized void registTransformer(Transformer transformer,
ClassLoader classLoader, boolean isNative) {
checkName(transformer.getTransformerName(), isNative);
if (registedTransformer.containsKey(transformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR,
" name=" + transformer.getTransformerName());
}
ComplexTransformerProxy complexTransformer = new ComplexTransformerProxy(transformer);
TransformerInfo info = buildTransformerInfo(complexTransformer, isNative, classLoader);
registedTransformer.put(transformer.getTransformerName(), info);
}
public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer,
ClassLoader classLoader, boolean isNative) {
checkName(complexTransformer.getTransformerName(), isNative);
if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR,
" name=" + complexTransformer.getTransformerName());
}
TransformerInfo info = buildTransformerInfo(complexTransformer, isNative, classLoader);
registedTransformer.put(complexTransformer.getTransformerName(), info);
}
/** * 该方法存在一定问题, <br> * 1 返回值为空,检查结果没处用 <br> * 2 校验是否本地方法不太严谨 <br> * @param functionName * @param isNative */
private static void checkName(String functionName, boolean isNative) {
boolean checkResult = true;
// 只有是datax本地的transform,name名称才dx_开头
if (isNative) {
if (!functionName.startsWith("dx_")) {
checkResult = false;
}
} else {
if (functionName.startsWith("dx_")) {
checkResult = false;
}
}
if (!checkResult) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR,
" name=" + functionName + ": isNative=" + isNative);
}
}
private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer,
boolean isNative, ClassLoader classLoader) {
TransformerInfo transformerInfo = new TransformerInfo();
transformerInfo.setClassLoader(classLoader);
transformerInfo.setIsNative(isNative);
transformerInfo.setTransformer(complexTransformer);
return transformerInfo;
}
public static List<String> getAllSuportTransformer() {
return new ArrayList<String>(registedTransformer.keySet());
}
}
- TransformerExecutionParas transform执行类的参数对象,主要包含几个属性及对应get set方法
/** * 以下是function参数 */
private Integer columnIndex;
private String[] paras;
private Map<String, Object> tContext;
private String code;
private List<String> extraPackage;
- TransformerExecution transform的执行类,主要方法只有一个genFinalParas,通过一系列内部计算最后得到一个TransformerExecutionParas
- TransformerExchanger转换器,主要方法doTransformer,可以在RecordExchanger中直接将record进行转换;
public Record doTransformer(Record record) {
if (transformerExecs == null || transformerExecs.size() == 0) {
return record;
}
Record result = record;
long diffExaustedTime = 0;
String errorMsg = null;
boolean failed = false;
for (TransformerExecution transformerInfoExec : transformerExecs) {
long startTs = System.nanoTime();
if (transformerInfoExec.getClassLoader() != null) {
classLoaderSwapper.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
}
/** * 延迟检查transformer参数的有效性,直接抛出异常,不作为脏数据 * 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查 */
if (!transformerInfoExec.isChecked()) {
if (transformerInfoExec.getColumnIndex() != null
&& transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
String.format("columnIndex[%s] out of bound[%s]. name=%s",
transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
transformerInfoExec.getTransformerName()));
}
transformerInfoExec.setIsChecked(true);
}
try {
result = transformerInfoExec.getTransformer()
.evaluate(result, transformerInfoExec.gettContext(),
transformerInfoExec.getFinalParas());
} catch (Exception e) {
errorMsg = String
.format("transformer(%s) has Exception(%s)", transformerInfoExec.getTransformerName(),
e.getMessage());
failed = true;
//LOG.error(errorMsg, e);
// transformerInfoExec.addFailedRecords(1);
//脏数据不再进行后续transformer处理,按脏数据处理,并过滤该record。
break;
} finally {
if (transformerInfoExec.getClassLoader() != null) {
classLoaderSwapper.restoreCurrentThreadClassLoader();
}
}
if (result == null) {
/** * 这个null不能传到writer,必须消化掉 */
totalFilterRecords++;
//transformerInfoExec.addFilterRecords(1);
break;
}
long diff = System.nanoTime() - startTs;
//transformerInfoExec.addExaustedTime(diff);
diffExaustedTime += diff;
//transformerInfoExec.addSuccessRecords(1);
}
totalExaustedTime += diffExaustedTime;
if (failed) {
totalFailedRecords++;
this.pluginCollector.collectDirtyRecord(record, errorMsg);
return null;
} else {
totalSuccessRecords++;
return result;
}
}
- TransformerUtil 工具类,主要方法buildTransformerInfo将conf转为可执行的TransformerExecution列表
/** * 根据task的配置构建transformer * * @param taskCfg Configuration * @return List<TransformerExecution> */
public static List<TransformerExecution> buildTransformerInfo(Configuration taskCfg) {
List<Configuration> tfConfigs = taskCfg.getListConfiguration(CoreConstant.JOB_TRANSFORMER);
if (tfConfigs == null || tfConfigs.size() == 0) {
return null;
}
List<TransformerExecution> result = new ArrayList<>();
List<String> funNames = new ArrayList<>();
for (Configuration cfg : tfConfigs) {
String functionName = cfg.getString("name");
if (StringUtils.isEmpty(functionName)) {
throw DataXException
.asDataXException(TRANSFORMER_CONFIGURATION_ERROR, "config=" + cfg.toJSON());
}
if (functionName.equals("dx_groovy") && funNames.contains("dx_groovy")) {
throw DataXException.asDataXException(TRANSFORMER_CONFIGURATION_ERROR,
"dx_groovy can be invoke once only.");
}
funNames.add(functionName);
}
//延迟load 第三方插件的function,并按需load
LOG.info(String.format(" user config transformers [%s], loading...", funNames));
TransformerRegistry.loadTransformerFromLocalStorage(funNames);
int i = 0;
for (Configuration cfg : tfConfigs) {
String funName = cfg.getString("name");
TransformerInfo transformerInfo = TransformerRegistry.getTransformer(funName);
if (transformerInfo == null) {
throw DataXException.asDataXException(TRANSFORMER_NOTFOUND_ERROR, "name=" + funName);
}
//具体的UDF对应一个paras
TransformerExecutionParas transformerExecutionParas = new TransformerExecutionParas();
// groovy function仅仅只有code
if (!funName.equals("dx_groovy") && !funName.equals("dx_fackGroovy")) {
Integer colIndex = cfg.getInt(CoreConstant.TRANSFORMER_PARAMETER_COLUMNINDEX);
if (colIndex == null) {
throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
"columnIndex must be set by UDF:name=" + funName);
}
transformerExecutionParas.setColumnIndex(colIndex);
List<String> paras = cfg.getList(CoreConstant.TRANSFORMER_PARAMETER_PARAS, String.class);
if (paras != null && paras.size() > 0) {
transformerExecutionParas.setParas(paras.toArray(new String[0]));
}
} else {
String code = cfg.getString(CoreConstant.TRANSFORMER_PARAMETER_CODE);
if (StringUtils.isEmpty(code)) {
throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
"groovy code must be set by UDF:name=" + funName);
}
transformerExecutionParas.setCode(code);
List<String> extraPackage = cfg.getList(TRANSFORMER_PARAMETER_EXTRAPACKAGE, String.class);
if (extraPackage != null && extraPackage.size() > 0) {
transformerExecutionParas.setExtraPackage(extraPackage);
}
}
transformerExecutionParas.settContext(cfg.getMap(TRANSFORMER_PARAMETER_CONTEXT));
TransformerExecution transformerExecution = new TransformerExecution(transformerInfo,
transformerExecutionParas);
transformerExecution.genFinalParas();
result.add(transformerExecution);
i++;
LOG.info(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s"
, i, transformerInfo.getTransformer().getTransformerName()
, transformerInfo.isNative(), cfg.getConfiguration("parameter")));
}
return result;
}
- TransformerErrorCode transform定义的一些错误代码,没什么好说的;
注:
-
对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
-
所有代码都已经上传到github(master分支和dev),可以免费白嫖
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145320.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...