datax(10): 源码解读Communication(Datax通讯类)「建议收藏」

datax(10): 源码解读Communication(Datax通讯类)「建议收藏」前面看了datax的通讯机制,继续看源码—具体的通讯类Communication。根据datax的运行模式的区别,数据的收集会有些区别,这篇文章都是讲的在standalone模式下。一、communication概述DataX所有的统计信息都会保存到Communication类里面。Communication支持下列数据的统计计数器,比如读取的字节速度,写入成功的数据条数/***所有的数值key-value对**/privateMap<String.

大家好,又见面了,我是你们的朋友全栈君。

前面看了datax的通讯机制,继续看源码—具体的通讯类 Communication。根据datax的运行模式的区别, 数据的收集会有些区别,这篇文章都是讲的在standalone模式下。


一、communication概述

DataX所有的统计信息都会保存到Communication类里面。

Communication支持下列数据的统计

  1. 计数器,比如读取的字节速度,写入成功的数据条数
  2. 统计的时间点 字符串类型的消息
  3. 执行时的异常
  4. 执行的状态, 比如成功或失败

  /** * 所有的数值key-value对 * */
  private Map<String, Number> counter;

  /** * 运行状态 * */
  private State state;

  /** * 异常记录 * */
  private Throwable throwable;

  /** * 记录的timestamp * */
  private long timestamp;

  /** * task给job的信息 * */
  Map<String, List<String>> message;
  

communication继承关系
在这里插入图片描述

如果需要汇总多个Communication的数据,Communication提供了mergeFrom方法。根据不同的数据类型,对应着不同的操作计数器类型,相同的key的数值累加

  • 合并异常,当自身的异常为null,才合并别的异常

  • 合并状态,如果有任意一个的状态失败了,那么返回失败的状态。如果有任意一个的状态正在运行,那么返回正在运行的状态

  • 合并消息, 相同的key的消息添加到同一个列表


二、communication主要方法

在这里插入图片描述


三、Communication的管理类

对于每个task组都有一个单独的Communication,用来存储这个组的统计数据。对于这些Communication,在LocalTGCommunicationManager类实现了集中管理。接下来看看LocalTGCommunicationManager的原理。

LocalTGCommunicationManager有个重要的属性taskGroupCommunicationMap,它是一个Map,保存了每个task组的统计数据。


public final class LocalTGCommunicationManager { 

private static Map<Integer, Communication> taskGroupCommunicationMap = new ConcurrentHashMap<>();
/** * 根据tgId注册comm * 当task组在初始化的时候,都会向LocalTGCommunicationManager这里注册。// 这里只是简单保存到taskGroupCommunicationMap变量里 * @param taskGroupId * @param communication */
public static void registerTaskGroupCommunication(int taskGroupId, Communication communication) { 

taskGroupCommunicationMap.put(taskGroupId, communication);
}
/** * 获取(合并)tg里面所有的comm * * @return Communication */
public static Communication getJobCommunication() { 

Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskGroupCommunication : taskGroupCommunicationMap.values()) { 

communication.mergeFrom(taskGroupCommunication);
}
return communication;
}
/** * 采用获取taskGroupId后再获取对应communication的方式, * 防止map遍历时修改,同时也防止对map key-value对的修改 * * @return */
public static Set<Integer> getTaskGroupIdSet() { 

return taskGroupCommunicationMap.keySet();
}
public static Communication getTaskGroupCommunication(int taskGroupId) { 

Validate.isTrue(taskGroupId >= 0, "taskGroupId不能小于0");
return taskGroupCommunicationMap.get(taskGroupId);
}
/** * 根据tgId 将taskGroupCommunicationMap中没有的comm 插入 * @param taskGroupId * @param comm */
public static void updateTaskGroupCommunication(final int taskGroupId, final Communication comm) { 

Validate.isTrue(taskGroupCommunicationMap.containsKey(
taskGroupId), String.format("taskGroupCommunicationMap中没有注册taskGroupId[%d]的Communication," +
"无法更新该taskGroup的信息", taskGroupId));
taskGroupCommunicationMap.put(taskGroupId, comm);
}
public static void clear() { 

taskGroupCommunicationMap.clear();
}
public static Map<Integer, Communication> getTaskGroupCommunicationMap() { 

return taskGroupCommunicationMap;
}
}

四、谁会注册Communication

AbstractScheduler会根据切分后的任务,为每个task组注册一个Communication。registerCommunication接收task配置列表,里面每个配置都包含了task group id。

进行注册communication的类

  • AbstractScheduler的schedule方法里 registerCommunication
  • TaskGroupContainer的start方法里 registerCommunication
  • AbstractTGContainerCommunicator的registerCommunication方法
  • AbstractContainerCommunicator的registerCommunication方法
  • StandAloneJobContainerCommunicator的registerCommunication方法

在这里插入图片描述


五、更新communication统计数据

主要更新communication的类
在这里插入图片描述

每个任务执行都会对应着Channel,Channel当每处理一条数据时,都会更新对应Communication的统计信息。
例如下面的pull方法是Writer从Channel拉取数据,每次pull的时候,都会调用statPull函数,会更新写入数据条数和字节数的信息。


public abstract class Channel{ 

private Communication currentCommunication;
public Record pull() { 

Record record = this.doPull();
this.statPull(1L, record.getByteSize());
return record;
}
/** * statPull方法,并没有限速。因为数据的整个流程是Reader -》 Channle -》 Writer, Reader的push速度限制了, * Writer的pull速度也就没必要限速 * * @param recordSize * @param byteSize */
private void statPull(long recordSize, long byteSize) { 

currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_RECORDS, recordSize);
currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_BYTES, byteSize);
}

六、收集communication统计数据

  1. AbstractScheduler想统计汇总后的数据,需要调用AbstractContainerCommunicator的collect方法

  2. StandAloneJobContainerCommunicator继承AbstractContainerCommunicator,实现了collect方法,它会调用AbstractCollector的collectFromTaskGroup方法获取数据

  3. ProcessInnerCollector实现了AbstractCollector的collectFromTaskGroup方法,它会调用LocalTGCommunicationManager的getJobCommunication方法, getJobCommunication方法会统计所有task的数据,然后返回。

在这里插入图片描述


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145489.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 简要说明continue命令和break命令的不同_continue的用法

    简要说明continue命令和break命令的不同_continue的用法break命令可以带一个参数,一个不带参数的break循环只能退出最内层的循环,而breakN可以退出N层循环。continue命令也可以带一个参数,一个不带参数的continue命令只去掉本次循环的剩余代码,而continueN将会把N层循环剩余的代码都去掉,但是循环的次数不变。#!/bin/shforiin”abcd”doech

  • Elasticsearch学习,请先看这一篇!

    题记:Elasticsearch研究有一段时间了,现特将Elasticsearch相关核心知识、原理从初学者认知、学习的角度,从以下9个方面进行详细梳理。欢迎讨论……0.带着问题上路——ES是如何产生的?(1)思考:大规模数据如何检索?如:当系统数据量上了10亿、100亿条的时候,我们在做系统架构的时候通常会从以下角度去考虑问题:1)用什么数据库好?(mysql、sybase、oracle、达

  • 《大秦帝国三:崛起》 剧情简介和最新进展第一季_大秦帝国之崛起剧情

    《大秦帝国三:崛起》 剧情简介和最新进展第一季_大秦帝国之崛起剧情大秦帝国三·裂变大秦帝国二·纵横大秦帝国三·崛起 大秦帝国三·崛起 剧情简介和最新进展规格:40集剧情简介:《大秦帝国三·崛起》,以《大秦帝国二·纵横》的结尾为开篇,承上启下,讲述战国中后期各国之间的征战。主要讲述的是秦昭襄王嬴稷期间战国的历史。秦昭襄王(公元前325年到公元前251年),在位56年。是秦国一统天下最直接的奠基人。当时

  • git push 报错处理 ! [rejected] master -> master (non-fast-forward)

    git push 报错处理 ! [rejected] master -> master (non-fast-forward)

  • JAVA面试题及答案整理(最新版)

    JAVA面试题及答案整理(最新版)这些Java技术栈整理成册(包括:VM,JAVA集合,JAVA多线程并发,JAVA基础,Spring原理,微服务,Netty与RPC,网络,日志,Zookeeper,Kafka,RabbitMQ,Hbase,MongoDB,Cassandra,设计模式,负载均衡,数据库,一致性哈希,JAVA算法,数据结构,加密算法,分布式缓存,Hadoop,Spark,Storm,YARN,机器学习,云计算),对你的面试大有帮助,让你offer到手,高薪也有!JVM 线程 JVM内存区域

  • C语言求最大公约数和最小公倍数(思路清晰+拓展)[通俗易懂]

    C语言求最大公约数和最小公倍数(思路清晰+拓展)[通俗易懂]最大公约数的求法首先了解它的一般求法(欧几里得算法):假设存在两个数A和B,假如A%B的结果不为0,那么A和B的最大公约数是B与A%B的最大公约数,一直往下计算,直到后者为0,此时的最大公约数为A’(注意不是A而是A’)。就比如上边的例子,当A%B==0的时候,最大公约数就是B了,这个A’就代表B。最大公约数的代码:(基于C++实现的函数)intgcd(inta,intb){ in…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号