datax(7):JobContainer源码解读

datax(7):JobContainer源码解读前面已经看了Engine,其中有一步就是判断container是job还是taskGroup类型。本文就好好看看JobContainer。一,概述JobContainer:job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作1、如果是job类型,则依次执行job的preHandler()、init()、prepare()、split()、schedule()、-post()、post.

大家好,又见面了,我是你们的朋友全栈君。

前面已经看了Engine,其中有一步就是判断container 是job还是taskGroup类型。本文就好好看看JobContainer。


一、AbstractContainer

datax中有两个容器类,一个taskGroupContainer,一个jobContainer,而他们都是实现了一个抽象类 AbstractContainer。

在这里插入图片描述

里面2个参数,一个start抽象方法,如下:

在这里插入图片描述


二、JobContainer概述


JobContainer:  job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作

1、如果是job类型,则依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

2、init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息

3、prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来

4、split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型

5、channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小

6、split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回

7、split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置

8、schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量

9、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。

10、taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务


三、主要方法

带do开头的方法,可以理解为具体实现类的执行
在这里插入图片描述


四、运行时序图

主入口为start方法

在这里插入图片描述


五、源码解读

package com.alibaba.datax.core.job;


/**
 * Created by jingxing on 14-8-24.
 * <p/>
 * job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报
 * 但它并不做实际的数据同步操作
 */
public class JobContainer extends AbstractContainer { 
   

  private static final Logger LOG = LoggerFactory.getLogger(JobContainer.class);

  private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  private ClassLoaderSwapper loaderSwap = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();

  private long jobId;

  private String readerPluginName;

  private String writerPluginName;

  /**
   * reader和writer jobContainer的实例
   */
  private Reader.Job jobReader;

  private Writer.Job jobWriter;

  private Configuration userConf;

  private long startTimeStamp;

  private long endTimeStamp;

  private long startTransferTimeStamp;

  private long endTransferTimeStamp;

  private int needChannelNumber;

  private int totalStage = 1;

  private ErrorRecordChecker errorLimit;

  public JobContainer(Configuration cfg) { 
   
    super(cfg);
    errorLimit = new ErrorRecordChecker(cfg);
  }

  /**
   * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、 post以及destroy和statistics
   */
  @Override
  public void start() { 
   
    LOG.info("DataX jobContainer starts job.");
    boolean hasException = false;
    // 是否空跑
    boolean isDryRun = false;
    try { 
   
      this.startTimeStamp = System.currentTimeMillis();
      isDryRun = configuration.getBool(DATAX_JOB_SETTING_DRYRUN, false);
      if (isDryRun) { 
   
        LOG.info("jobContainer starts to do preCheck ...");
        // 空跑,仍需要检查,保证各种参数 正确
        this.preCheck();
      } else { 
   
        // 开源版本的datax ,userConf没有用处
        userConf = configuration.clone();
        LOG.debug("jobContainer starts to do preHandle ...");
        this.preHandle();
        LOG.debug("jobContainer starts to do init ...");
        this.init();
        LOG.info("jobContainer starts to do prepare ...");
        this.prepare();
        LOG.info("jobContainer starts to do split ...");
        this.totalStage = this.split();
        LOG.info("jobContainer starts to do schedule ...");
        this.schedule();
        LOG.debug("jobContainer starts to do post ...");
        this.post();
        LOG.debug("jobContainer starts to do postHandle ...");
        this.postHandle();
        LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
        this.invokeHooks();
      }
    } catch (Throwable e) { 
   
      LOG.error("Exception when job run", e);
      hasException = true;
      if (e instanceof OutOfMemoryError) { 
   
        this.destroy();
        System.gc();
      }
      if (super.getContainerCommunicator() == null) { 
   
        /**
         * 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,
         * 需要在此处对 containerCollector 进行初始化
         */
        AbstractContainerCommunicator tempContainerCollector;
        // standalone
        tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
        super.setContainerCommunicator(tempContainerCollector);
      }
      Communication comm = super.getContainerCommunicator().collect();
      // 汇报前的状态,不需要手动进行设置
      // communication.setState(State.FAILED);
      comm.setThrowable(e);
      comm.setTimestamp(this.endTimeStamp);

      Communication tempComm = new Communication();
      tempComm.setTimestamp(this.startTransferTimeStamp);

      Communication report = CommunicationTool.getReportCommunication(comm, tempComm, totalStage);
      super.getContainerCommunicator().report(report);
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
    } finally { 
   
      if (!isDryRun) { 
   
        this.destroy();
        this.endTimeStamp = System.currentTimeMillis();
        if (!hasException) { 
   
          //最后打印cpu的平均消耗,GC的统计
          VMInfo vmInfo = VMInfo.getVmInfo();
          if (vmInfo != null) { 
   
            vmInfo.getDelta(false);
            LOG.info(vmInfo.totalString());
          }

          LOG.info(PerfTrace.getInstance().summarizeNoException());
          this.logStatistics();
        }
      }
    }
  }

  /**
   * 预检查,检查后将参数值赋给 JobContainer
   */
  private void preCheck() { 
   
    this.preCheckInit();
    this.adjustChannelNumber();
    //如果上面方法未设置channelNum成功,则给channelNum一个默认值 1
    needChannelNumber = needChannelNumber <= 0 ? 1 : needChannelNumber;
    this.preCheckReader();
    this.preCheckWriter();
    LOG.info("PreCheck通过");
  }

  /**
   * 预检查,初始化 <br/>
   * 1 从cfg中获取jobId(如果小于0,则置为0,并设置回cfg中) <br/>
   * 2 给当前限制setName为 job-jobId <br/>
   * 3 从容器的通信类中构造出 插件集合 <br/>
   * 4 将 插件集合 检查+初始化 最后赋值给reader或writer
   */
  private void preCheckInit() { 
   
    // 从 cfg中获取jobId,如果小于0,则将jobId赋值0
    jobId = configuration.getLong(DATAX_CORE_CONTAINER_JOB_ID, -1);
    if (this.jobId < 0) { 
   
      LOG.info("Set jobId = 0");
      this.jobId = 0;
      this.configuration.set(DATAX_CORE_CONTAINER_JOB_ID, this.jobId);
    }

    Thread.currentThread().setName("job-" + this.jobId);

    /**
     * 1 getContainerCommunicator() 获取容器的通讯类
     * 2 用容器通信类构造一个job类型的插件收集器
     * 3 用插件收集器init出一个jobReader或者jobWriter
     */
    JobPluginCollector pluginCollector = new DefaultJobPluginCollector(getContainerCommunicator());
    this.jobReader = this.preCheckReaderInit(pluginCollector);
    this.jobWriter = this.preCheckWriterInit(pluginCollector);
  }

  /**
   * 检查+初始化 reader 1
   *
   * @param jobPluginCollector JobPluginCollector
   * @return Reader.Job
   */
  private Reader.Job preCheckReaderInit(JobPluginCollector jobPluginCollector) { 
   
    readerPluginName = configuration.getString(DATAX_JOB_CONTENT_READER_NAME);

    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    Reader.Job reader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, readerPluginName);
    configuration.set(DATAX_JOB_CONTENT_READER_PARAMETER + ".dryRun", true);
    Configuration cfg = configuration.getConfiguration(DATAX_JOB_CONTENT_READER_PARAMETER);
    reader.setPluginJobConf(cfg);
    reader.setPeerPluginJobConf(cfg);
    reader.setJobPluginCollector(jobPluginCollector);

    loaderSwap.restoreCurrentThreadClassLoader();
    return reader;
  }


  private Writer.Job preCheckWriterInit(JobPluginCollector jobPluginCollector) { 
   
    writerPluginName = configuration.getString(DATAX_JOB_CONTENT_WRITER_NAME);
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(PluginType.WRITER, writerPluginName);

    configuration.set(DATAX_JOB_CONTENT_WRITER_PARAMETER + ".dryRun", true);

    // 设置writer的jobConfig
    jobWriter.setPluginJobConf(configuration.getConfiguration(
        DATAX_JOB_CONTENT_WRITER_PARAMETER));
    // 设置reader的readerConfig
    jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
        DATAX_JOB_CONTENT_READER_PARAMETER));

    jobWriter.setPeerPluginName(this.readerPluginName);
    jobWriter.setJobPluginCollector(jobPluginCollector);

    loaderSwap.restoreCurrentThreadClassLoader();
    return jobWriter;
  }

  /**
   * 预检查 reader <br/> 1 先将 当前线程的classLoader 设置为 reader的 classLoader <br/> 2 进行reader的检查 <br/> 3
   * 恢复reader的 classLoader
   */
  private void preCheckReader() { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", this.readerPluginName));
    this.jobReader.preCheck();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  /**
   * 原理同上面 preCheckReader 一样
   */
  private void preCheckWriter() { 
   
    loaderSwap.setCurrentThreadClassLoader(
        LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
    LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", this.writerPluginName));
    this.jobWriter.preCheck();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  /**
   * reader和writer的初始化 <br>
   * 1 获取(或重置)jobId  <br>
   * 2 给当前线程设置名称 job-jobId <br>
   * 3 根据容器通讯类构造一个job类型的插件收集器 <br>
   * 4 使用插件收集器 初始化 jobReader和jobWriter <br>
   */
  private void init() { 
   

    this.jobId = this.configuration.getLong(DATAX_CORE_CONTAINER_JOB_ID, -1);
    if (this.jobId < 0) { 
   
      LOG.info("Set jobId = 0");
      this.jobId = 0;
      this.configuration.set(DATAX_CORE_CONTAINER_JOB_ID, this.jobId);
    }

    Thread.currentThread().setName("job-" + this.jobId);

    JobPluginCollector pluginCollector = new DefaultJobPluginCollector(getContainerCommunicator());
    //必须先Reader ,后Writer。因为jobWriter里有的参数是根据jobReader设置的
    this.jobReader = this.initJobReader(pluginCollector);
    this.jobWriter = this.initJobWriter(pluginCollector);
  }

  /**
   * 准备 jobReader和 jobWriter
   */
  private void prepare() { 
   
    this.prepareJobReader();
    this.prepareJobWriter();
  }

  /**
   * 预处理 <br/>
   * 1 从cfg获取handler插件类型string(reader,transformer,writer,handler)<br>
   * 2 根据插件类型+名称加载插件(中间涉及classLoaderSwap)<br>
   * 3 给插件设置 job插件收集器,同时本插件也要 preHandler<br>
   */
  private void preHandle() { 
   

    String pluginType = this.configuration.getString(DATAX_JOB_PREHANDLER_PLUGINTYPE);
    if (StringUtils.isEmpty(pluginType)) { 
   
      return;
    }
    PluginType handlerPluginType;
    try { 
   
      handlerPluginType = PluginType.valueOf(pluginType.toUpperCase());
    } catch (IllegalArgumentException e) { 
   
      throw DataXException.asDataXException(CONFIG_ERROR,
          String.format("Job preHandler's pluginType(%s) set error, reason(%s)",
              pluginType, e.getMessage()));
    }

    String pluginName = this.configuration.getString(DATAX_JOB_PREHANDLER_PLUGINNAME);
    JarLoader jarLoader = LoadUtil.getJarLoader(handlerPluginType, pluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, pluginName);
    JobPluginCollector jobPlugin = new DefaultJobPluginCollector(getContainerCommunicator());
    handler.setJobPluginCollector(jobPlugin);

    //todo configuration的安全性,将来必须保证 handler.preHandler暂未实现
    handler.preHandler(configuration);
    loaderSwap.restoreCurrentThreadClassLoader();

    LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(configuration) + "\n");
  }

  /**
   * 原理类同 preHandle
   */
  private void postHandle() { 
   
    String type = this.configuration.getString(DATAX_JOB_POSTHANDLER_PLUGINTYPE);
    if (StringUtils.isEmpty(type)) { 
   
      return;
    }
    PluginType pluginType;
    try { 
   
      pluginType = PluginType.valueOf(type.toUpperCase());
    } catch (IllegalArgumentException e) { 
   
      throw DataXException.asDataXException(CONFIG_ERROR,
          String.format("Job postHandler's pluginType(%s) set error, reason(%s)",
              type.toUpperCase(), e.getMessage()));
    }

    String name = this.configuration.getString(DATAX_JOB_POSTHANDLER_PLUGINNAME);
    JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, name);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    AbstractJobPlugin handler = LoadUtil.loadJobPlugin(pluginType, name);

    JobPluginCollector pluginCollector = new DefaultJobPluginCollector(getContainerCommunicator());
    handler.setJobPluginCollector(pluginCollector);

    handler.postHandler(configuration);
    loaderSwap.restoreCurrentThreadClassLoader();
  }


  /**
   * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, 达到切分后数目相等,
   * 才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起。然后,为避免顺序给读写端带来长尾影响,
   * 将整合的结果shuffler掉 <br/>
   * 1 动态调整并获取channel数量 <br/>
   * 2 根据1的channel数量 切割reader 得到reader的cfg列表 <br/>
   * 3 根据2的 cfg数量切割writer,得到writer cfg 列表 <br/>
   * 4 获取transform的cfg 列表 <br/>
   * 5 合并234的cfg <br/>
   */
  private int split() { 
   
    this.adjustChannelNumber();
    needChannelNumber = needChannelNumber <= 0 ? 1 : needChannelNumber;

    List<Configuration> readerTaskCfgs = this.doReaderSplit(this.needChannelNumber);
    int taskNumber = readerTaskCfgs.size();
    List<Configuration> writerTaskCfs = this.doWriterSplit(taskNumber);

    List<Configuration> transformers = configuration
        .getListConfiguration(DATAX_JOB_CONTENT_TRANSFORMER);

    LOG.debug("transformer configuration: " + JSON.toJSONString(transformers));
    //输入是reader和writer的parameter list,输出是content下面元素的list
    List<Configuration> contentCfgs = mergeReaderAndWriterTaskConfigs(readerTaskCfgs, writerTaskCfs,
        transformers);

    LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentCfgs));
    this.configuration.set(DATAX_JOB_CONTENT, contentCfgs);
    return contentCfgs.size();
  }

  /**
   * 根据byteNum和RecordNum调整channel数量 <br>
   * 1 是否有全局(job) byte限制,如果有,则必须要有channel的byte设置,最后计算出 需要的channelByByte数量  <br>
   * 2 是否有全局(job) record限制,如果有,则必须要有channel的record设置,最后计算出 需要的channelByRecord数量 <br>
   * 3 取1和2的最小值设置到job的channelNumber,如果可以设置,则该方法任务完成,退出 <br>
   * 4 如果3 未能设置,则从cfg中判断用户是否自己设置了channelNum,如果用户设置了,将用户设置的给本job channel <br>
   */
  private void adjustChannelNumber() { 
   
    int needChannelNumByByte = Integer.MAX_VALUE;

    boolean hasByteLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
    if (hasByteLimit) { 
   
      long jobByteSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
      // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
      Long channelByteSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
      if (channelByteSpeed == null || channelByteSpeed <= 0) { 
   
        throw DataXException.asDataXException(
            CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
      }
      needChannelNumByByte = (int) (jobByteSpeed / channelByteSpeed);
      needChannelNumByByte = needChannelNumByByte > 0 ? needChannelNumByByte : 1;
      LOG.info("Job set Max-Byte-Speed to " + jobByteSpeed + " bytes.");
    }

    int needChannelNumByRecord = Integer.MAX_VALUE;
    boolean hasRecordLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
    if (hasRecordLimit) { 
   
      long jobRecordSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 100000);
      Long channelRecordSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
      if (channelRecordSpeed == null || channelRecordSpeed <= 0) { 
   
        throw DataXException.asDataXException(CONFIG_ERROR,
            "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
      }
      needChannelNumByRecord = (int) (jobRecordSpeed / channelRecordSpeed);
      needChannelNumByRecord = needChannelNumByRecord > 0 ? needChannelNumByRecord : 1;
      LOG.info("Job set Max-Record-Speed to " + jobRecordSpeed + " records.");
    }

    // 全局的 needChannelNumber 按照needChannelNumByByte 和needChannelNumByRecord  取较小值
    needChannelNumber = Math.min(needChannelNumByByte, needChannelNumByRecord);

    // 如果从byte或record上设置了needChannelNumber则退出
    if (this.needChannelNumber < Integer.MAX_VALUE) { 
   
      return;
    }

    boolean hasChannelLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
    if (hasChannelLimit) { 
   
      needChannelNumber = this.configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL);
      LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");
      return;
    }
    throw DataXException.asDataXException(CONFIG_ERROR, "Job运行速度必须设置");
  }

  /**
   * schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
   * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来 <br>
   * 1 从配置的content中获取task数量,并与channel数量对比,较小的值设置给perTrace <br>
   * 2 从配置获取每个taskGroup的channel数量(默认设置5),使用公平策略生成 taskGroup的配置列表 <br>
   * 3 设置运行模式(开源的代码是standalone),根据全局的配置initStandaloneScheduler <br>
   * 4  设置开始传输时间戳,调度执行,记录结束传输时候的时间戳 <br>
   */
  private void schedule() { 
   
    // 这里的全局speed和每个channel的速度设置为B/s
    int taskNumber = configuration.getList(DATAX_JOB_CONTENT).size();
    needChannelNumber = Math.min(needChannelNumber, taskNumber);
    PerfTrace.getInstance().setChannelNumber(needChannelNumber);

    // 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
    // 每个taskGroup的channel的数量
    int channelsPerTaskGroup = configuration.getInt(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
    List<Configuration> taskGroupCfgs = JobAssignUtil
        .assignFairly(configuration, needChannelNumber, channelsPerTaskGroup);
    LOG.info("Scheduler starts [{}] taskGroups.", taskGroupCfgs.size());

    ExecuteMode executeMode = null;
    AbstractScheduler scheduler;
    try { 
   
      executeMode = ExecuteMode.STANDALONE;
      scheduler = initStandaloneScheduler(this.configuration);

      //设置 executeMode
      for (Configuration taskGroupConfig : taskGroupCfgs) { 
   
        taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
      }
      //下面if没有用,因为开源的datax是local或者standalone模式
      if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { 
   
        if (this.jobId <= 0) { 
   
          throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
              "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
        }
      }

      LOG.info("Running by {} Mode.", executeMode);
      this.startTransferTimeStamp = System.currentTimeMillis();
      scheduler.schedule(taskGroupCfgs);
      this.endTransferTimeStamp = System.currentTimeMillis();
    } catch (Exception e) { 
   
      LOG.error("运行scheduler 模式[{}]出错.", executeMode);
      this.endTransferTimeStamp = System.currentTimeMillis();
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
    }

    //检查任务执行情况,脏数据百分比或者调试是否超过限制
    this.checkLimit();
  }


  private AbstractScheduler initStandaloneScheduler(Configuration cfg) { 
   
    AbstractContainerCommunicator containerComm = new StandAloneJobContainerCommunicator(cfg);
    super.setContainerCommunicator(containerComm);
    return new StandAloneScheduler(containerComm);
  }

  /**
   * 执行jobReader和jobWriter
   */
  private void post() { 
   
    this.postJobWriter();
    this.postJobReader();
  }

  private void destroy() { 
   
    if (this.jobWriter != null) { 
   
      this.jobWriter.destroy();
      this.jobWriter = null;
    }
    if (this.jobReader != null) { 
   
      this.jobReader.destroy();
      this.jobReader = null;
    }
  }

  private void logStatistics() { 
   
    long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
    long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
    if (0L == transferCosts) { 
   
      transferCosts = 1L;
    }

    if (super.getContainerCommunicator() == null) { 
   
      return;
    }

    Communication comm = super.getContainerCommunicator().collect();
    comm.setTimestamp(this.endTimeStamp);

    Communication tempComm = new Communication();
    tempComm.setTimestamp(this.startTransferTimeStamp);

    Communication reportComm = CommunicationTool
        .getReportCommunication(comm, tempComm, this.totalStage);

    // 字节速率
    long byteSpeedPerSecond = comm.getLongCounter(READ_SUCCEED_BYTES) / transferCosts;
    long recordSpeedPerSecond = comm.getLongCounter(READ_SUCCEED_RECORDS) / transferCosts;
    reportComm.setLongCounter(BYTE_SPEED, byteSpeedPerSecond);
    reportComm.setLongCounter(RECORD_SPEED, recordSpeedPerSecond);

    super.getContainerCommunicator().report(reportComm);

    LOG.info(String.format(
        "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
            + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
            + "%-26s: %19s\n",
        "任务启动时刻", dateFormat.format(startTimeStamp),
        "任务结束时刻", dateFormat.format(endTimeStamp),

        "任务总计耗时", totalCosts + "s",
        "任务平均流量", StrUtil.stringify(byteSpeedPerSecond) + "/s",
        "记录写入速度", recordSpeedPerSecond + "rec/s",
        "读出记录总数", CommunicationTool.getTotalReadRecords(comm),
        "读写失败总数", CommunicationTool.getTotalErrorRecords(comm)
    ));

    if (comm.getLongCounter(TRANSFORMER_SUCCEED_RECORDS) > 0
        || comm.getLongCounter(TRANSFORMER_FAILED_RECORDS) > 0
        || comm.getLongCounter(TRANSFORMER_FILTER_RECORDS) > 0) { 
   
      LOG.info(String.format(
          "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
          "Transformer成功记录总数", comm.getLongCounter(TRANSFORMER_SUCCEED_RECORDS),
          "Transformer失败记录总数", comm.getLongCounter(TRANSFORMER_FAILED_RECORDS),
          "Transformer过滤记录总数", comm.getLongCounter(TRANSFORMER_FILTER_RECORDS)
      ));
    }


  }


  /**
   * reader job的初始化,返回Reader.Job
   *
   * @param jobPluginCollector JobPluginCollector
   * @return Reader.Job
   */
  private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) { 
   

    readerPluginName = configuration.getString(DATAX_JOB_CONTENT_READER_NAME);
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, readerPluginName);

    // 设置reader的jobConfig
    jobReader.setPluginJobConf(configuration.getConfiguration(DATAX_JOB_CONTENT_READER_PARAMETER));
    // 设置reader的readerConfig
    Configuration writerPara = configuration.getConfiguration(DATAX_JOB_CONTENT_WRITER_PARAMETER);
    jobReader.setPeerPluginJobConf(writerPara);

    jobReader.setJobPluginCollector(jobPluginCollector);
    jobReader.init();

    loaderSwap.restoreCurrentThreadClassLoader();
    return jobReader;
  }

  /**
   * writer job的初始化,返回Writer.Job
   *
   * @return
   */
  private Writer.Job initJobWriter(
      JobPluginCollector jobPluginCollector) { 
   
    this.writerPluginName = this.configuration.getString(DATAX_JOB_CONTENT_WRITER_NAME);
    loaderSwap.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
        PluginType.WRITER, this.writerPluginName));

    Writer.Job jobWriter = (Writer.Job) LoadUtil.loadJobPlugin(
        PluginType.WRITER, this.writerPluginName);

    // 设置writer的jobConfig
    jobWriter.setPluginJobConf(this.configuration.getConfiguration(
        DATAX_JOB_CONTENT_WRITER_PARAMETER));

    // 设置reader的readerConfig
    jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration(
        DATAX_JOB_CONTENT_READER_PARAMETER));

    jobWriter.setPeerPluginName(this.readerPluginName);
    jobWriter.setJobPluginCollector(jobPluginCollector);
    jobWriter.init();
    loaderSwap.restoreCurrentThreadClassLoader();

    return jobWriter;
  }

  /**
   * 准备jobReader <br>
   * 1 根据 插件类型(reader)和插件名称(read) 获取jarLoader <br>
   * 2 使用loaderSwap临时交换当前线程的 classLoader <br>
   * 3 各自真实的jobReader(如mysql的reader等) 准备 <br>
   */
  private void prepareJobReader() { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    LOG.info("DataX Reader.Job [" + this.readerPluginName + "] do prepare work .");
    this.jobReader.prepare();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  /**
   * 原理类同上面 prepareJobReader
   */
  private void prepareJobWriter() { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    LOG.info("DataX Writer.Job [" + this.writerPluginName + "] do prepare work .");
    this.jobWriter.prepare();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  /**
   * 根据建议的数量进行分割reader <br>
   * 1 根据类型和名称获取jarLoader,然暂存在 loaderSwap中  <br>
   * 2 根据建议的数量 切分具体的reader(如mysqlReader),得到 cfg列表 <br>
   * 3 loaderSwap恢复回原来的 classLoader <br>
   *
   * @param adviceNumber int
   * @return List<Configuration>
   */
  // TODO: 如果源头就是空数据
  private List<Configuration> doReaderSplit(int adviceNumber) { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    List<Configuration> readerSlicess = this.jobReader.split(adviceNumber);
    if (readerSlicess == null || readerSlicess.size() <= 0) { 
   
      throw DataXException.asDataXException(PLUGIN_SPLIT_ERROR, "reader切分的task数目不能小于等于0");
    }
    LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.", readerPluginName, readerSlicess.size());
    loaderSwap.restoreCurrentThreadClassLoader();
    return readerSlicess;
  }

  /**
   * 类同上面 doReaderSplit 方法
   *
   * @param readerTaskNumber
   * @return
   */
  private List<Configuration> doWriterSplit(int readerTaskNumber) { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);

    List<Configuration> writerSlicess = this.jobWriter.split(readerTaskNumber);
    if (writerSlicess == null || writerSlicess.size() <= 0) { 
   
      throw DataXException.asDataXException(PLUGIN_SPLIT_ERROR, "writer切分的task不能小于等于0");
    }
    LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.", writerPluginName, writerSlicess.size());
    loaderSwap.restoreCurrentThreadClassLoader();

    return writerSlicess;
  }

  /**
   * 按顺序整合reader和writer的配置,这里的顺序不能乱! 输入是reader、writer级别的配置,输出是一个完整task的配置
   *
   * @param readerTasksConfigs
   * @param writerTasksConfigs
   * @return
   */
  private List<Configuration> mergeReaderAndWriterTaskConfigs(
      List<Configuration> readerTasksConfigs,
      List<Configuration> writerTasksConfigs) { 
   
    return mergeReaderAndWriterTaskConfigs(readerTasksConfigs, writerTasksConfigs, null);
  }

  /**
   * 合并 reader,writer transform的task配置
   *
   * @param readerTasksConfigs List<Configuration>
   * @param writerTasksConfigs List<Configuration>
   * @param transformerConfigs List<Configuration>
   * @return List<Configuration>
   */
  private List<Configuration> mergeReaderAndWriterTaskConfigs(
      List<Configuration> readerTasksConfigs,
      List<Configuration> writerTasksConfigs,
      List<Configuration> transformerConfigs) { 
   
    if (readerTasksConfigs.size() != writerTasksConfigs.size()) { 
   
      throw DataXException.asDataXException(PLUGIN_SPLIT_ERROR,
          String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
              readerTasksConfigs.size(), writerTasksConfigs.size())
      );
    }

    List<Configuration> contentConfigs = new ArrayList<>();
    for (int i = 0; i < readerTasksConfigs.size(); i++) { 
   
      Configuration taskConfig = Configuration.newDefault();
      taskConfig.set(CoreConstant.JOB_READER_NAME, this.readerPluginName);
      taskConfig.set(CoreConstant.JOB_READER_PARAMETER, readerTasksConfigs.get(i));
      taskConfig.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName);
      taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i));

      if (transformerConfigs != null && transformerConfigs.size() > 0) { 
   
        taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
      }

      taskConfig.set(CoreConstant.TASK_ID, i);
      contentConfigs.add(taskConfig);
    }
    return contentConfigs;
  }

  /**
   * 这里比较复杂,分两步整合 1. tasks到channel 2. channel到taskGroup 合起来考虑,其实就是把tasks整合到taskGroup中,需要满足计算出的channel数,同时不能多起channel
   * <p/>
   * example:
   * <p/>
   * 前提条件: 切分后是1024个分表,假设用户要求总速率是1000M/s,每个channel的速率的3M/s, 每个taskGroup负责运行7个channel
   * <p/>
   * 计算: 总channel数为:1000M/s / 3M/s = 333个,为平均分配,计算可知有308个每个channel有3个tasks,而有25个每个channel有4个tasks,
   * 需要的taskGroup数为:333 / 7 = 47...4,也就是需要48个taskGroup,47个是每个负责7个channel,有4个负责1个channel
   * <p/>
   * 处理:我们先将这负责4个channel的taskGroup处理掉,逻辑是: 先按平均为3个tasks找4个channel,设置taskGroupId为0,
   * 接下来就像发牌一样轮询分配task到剩下的包含平均channel数的taskGroup中
   * <p/>
   * TODO delete it
   *
   * @param averTaskPerChannel   int
   * @param channelCnt           int
   * @param channelsPerTaskGroup int
   * @return 每个taskGroup独立的全部配置
   */
  @SuppressWarnings("serial")
  private List<Configuration> distributeTasksToTaskGroup(int averTaskPerChannel, int channelCnt,
      int channelsPerTaskGroup) { 
   
    String msg = "每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数";
    Validate.isTrue(averTaskPerChannel > 0 && channelCnt > 0 && channelsPerTaskGroup > 0, msg);
    List<Configuration> taskConfigs = configuration.getListConfiguration(DATAX_JOB_CONTENT);
    int taskGroupNumber = channelCnt / channelsPerTaskGroup;
    int leftChannelNumber = channelCnt % channelsPerTaskGroup;
    if (leftChannelNumber > 0) { 
   
      taskGroupNumber += 1;
    }

    // 如果只有一个taskGroup,直接打标返回
    if (taskGroupNumber == 1) { 
   
      final Configuration taskGroupCfg = this.configuration.clone();
      // configure的clone不能clone出
      taskGroupCfg.set(DATAX_JOB_CONTENT, configuration.getListConfiguration(DATAX_JOB_CONTENT));
      taskGroupCfg.set(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, channelCnt);
      taskGroupCfg.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, 0);
      return Collections.singletonList(taskGroupCfg);
    }

    List<Configuration> taskGroupConfigs = new ArrayList<>();
    //将每个taskGroup中content的配置清空
    for (int i = 0; i < taskGroupNumber; i++) { 
   
      Configuration taskGroupCfg = this.configuration.clone();
      List<Configuration> taskGroupJobContent = taskGroupCfg
          .getListConfiguration(DATAX_JOB_CONTENT);
      taskGroupJobContent.clear();
      taskGroupCfg.set(DATAX_JOB_CONTENT, taskGroupJobContent);
      taskGroupConfigs.add(taskGroupCfg);
    }

    int taskConfigIndex = 0;
    int channelIndex = 0;
    int taskGroupConfigIndex = 0;

    //先处理掉taskGroup包含channel数不是平均值的taskGroup
    if (leftChannelNumber > 0) { 
   
      Configuration taskGroupCfg = taskGroupConfigs.get(taskGroupConfigIndex);
      for (; channelIndex < leftChannelNumber; channelIndex++) { 
   
        for (int i = 0; i < averTaskPerChannel; i++) { 
   
          List<Configuration> taskGroupJobContent = taskGroupCfg
              .getListConfiguration(DATAX_JOB_CONTENT);
          taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));
          taskGroupCfg.set(DATAX_JOB_CONTENT, taskGroupJobContent);
        }
      }
      taskGroupCfg.set(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, leftChannelNumber);
      taskGroupCfg.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, taskGroupConfigIndex++);
    }

    //下面需要轮询分配,并打上channel数和taskGroupId标记
    int equalDivisionStartIndex = taskGroupConfigIndex;
    while (taskConfigIndex < taskConfigs.size() && equalDivisionStartIndex < taskGroupConfigs
        .size()) { 
   
      for (taskGroupConfigIndex = equalDivisionStartIndex; taskGroupConfigIndex < taskGroupConfigs
          .size() && taskConfigIndex < taskConfigs.size(); taskGroupConfigIndex++) { 
   
        Configuration taskGroupConfig = taskGroupConfigs.get(taskGroupConfigIndex);
        List<Configuration> taskGroupJobContent = taskGroupConfig
            .getListConfiguration(DATAX_JOB_CONTENT);
        taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));
        taskGroupConfig.set(
            DATAX_JOB_CONTENT, taskGroupJobContent);
      }
    }

    for (taskGroupConfigIndex = equalDivisionStartIndex;
        taskGroupConfigIndex < taskGroupConfigs.size(); ) { 
   
      Configuration taskGroupConfig = taskGroupConfigs.get(taskGroupConfigIndex);
      taskGroupConfig.set(DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL,
          channelsPerTaskGroup);
      taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID,
          taskGroupConfigIndex++);
    }

    return taskGroupConfigs;
  }

  /**
   * 执行jobReader <br>
   * 1 交换 jarLoader <br>
   * 2 具体的jobReader执行(例如mysql的reader) <br>
   */
  private void postJobReader() { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName);
    this.jobReader.post();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  private void postJobWriter() { 
   
    JarLoader jarLoader = LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName);
    loaderSwap.setCurrentThreadClassLoader(jarLoader);
    LOG.info("DataX Writer.Job [{}] do post work.", this.writerPluginName);
    this.jobWriter.post();
    loaderSwap.restoreCurrentThreadClassLoader();
  }

  /**
   * 检查最终结果是否超出阈值,如果阈值设定小于1,则表示百分数阈值,大于1表示条数阈值。
   */
  private void checkLimit() { 
   
    Communication communication = super.getContainerCommunicator().collect();
    errorLimit.checkRecordLimit(communication);
    errorLimit.checkPercentageLimit(communication);
  }

  /**
   * 调用外部钩子hook,可以做一些监控、分析等
   */
  private void invokeHooks() { 
   
    Communication comm = super.getContainerCommunicator().collect();
    // 定义钩子路径
    String dir = DATAX_HOME + "/hook";
    HookInvoker invoker = new HookInvoker(dir, configuration, comm.getCounter());
    invoker.invokeAll();
  }
}


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145745.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 更换pip源到国内镜像(docker更换阿里镜像源)

    #默认自动安装python-mpipinstall–upgradepip#一般库的本地安装pipinstallfilename.whl#pip的本地安装及版本显示python-mpipinstallpip-20.0.2-py2.py3-none-any.whlpip-Vpip项目下载地址国内镜像源https://pypi.tuna.tsinghua.edu.cn/simple#清华http://mirrors.aliyun.com/pyp.

  • 汇微商app v4.0.1官方iPhone版

    汇微商app v4.0.1官方iPhone版微信加粉完全免费,分行业类别,精确加粉,自动匹配筛选,拒绝僵尸粉,让你彻底解决烦恼。

  • Android 再次探究Fragment在各种情况下的onResume与onPause

    Android 再次探究Fragment在各种情况下的onResume与onPause之前写过一篇关于Fragment真正的onResume与onPause的文章,但是当时写的比较匆忙,并不是很严谨,导致问题多多,今天抽空更新下关于fragment在各种情况下的onResume与onPause的方法,首先附一张fragment的生命周期图:1.fragment通过replace方式使用fragment在该方式中使用,使用周期基本与图上的生命周期一样,onResume与onP…

  • orange软件使用[通俗易懂]

    orange做数据分析契机实验手册目录orange软件部分界面展示小试牛刀:数据导入和展示小试牛刀:散点图绘制总结契机今天在上大数据时代实验课,作为一名软件工程专业的大三学生,在上这节公选课之前就决定这节课做完这个学期所有的实验报告,可以说“有亿点”不屑;尤其当老师说Python很麻烦的时候,我更是震惊了,因为我接触过这么多编程语言,Python真的很友好了,很傻瓜,但是当老师打开实验手册的时候,我真香了!!!下面先来看看手册的目录:实验手册目录为什么会真香呢,因为目录里这些有好多机器学习算法!

  • 网页视频下载方法[通俗易懂]

    问题有时候我们在做PPT或者撰写一些报告、案例的时候,需要一些视频作为素材,网上搜到后,想下载却比较麻烦,有的在专业视频网站上,有的在新闻网站上,有的在机构网站上,有的在社交媒体上,有没有简便、快速、可行的视频下载方法,并且不需要付费或者安装额外软件呢。下面说明几种方法,基本可以涵盖绝大多数情况。解决办法非专业视频网站上的视频以下两种办法需要使用谷歌浏览器Chrome电脑版打开视频所在的网页,右键——>审查元素——>点击左上角的小箭头——>在页面中选中视频界面——>在审查

  • pytest 执行用例_python 分布式计算

    pytest 执行用例_python 分布式计算前言平常我们功能测试用例非常多时,比如有1千条用例,假设每个用例执行需要1分钟,如果单个测试人员执行需要1000分钟才能跑完当项目非常紧急时,会需要协调多个测试资源来把任务分成两部分,于是执行时间

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号