datax(22):任务分配规则

datax(22):任务分配规则前面学习了一些源码和datax的执行,其中有一个重要的流程任务切分。今天梳理下;一、概述Datax根首先据配置文件,确定好channel的并发数目。然后将整个job分成一个个小的task,然后划分成组。从JobContainer的start()方法开始,进入split()方法,split方法里执行后续所有的切分;二、总体流程切分任务channel数目的确定reader的切分Writer的切分合并配置分配任务三、切分任务JobContainer的split负责将整个jo.

大家好,又见面了,我是你们的朋友全栈君。

前面学习了一些源码和datax的执行,其中有一个重要的流程任务切分。今天梳理下;


一、概述

Datax根首先据配置文件,确定好channel的并发数目。然后将整个job分成一个个小的task,然后划分成组。从JobContainer的start()方法开始,进入split()方法,split方法里执行后续所有的切分;


二、总体流程

  1. 切分任务
  2. channel数目的确定
  3. reader的切分
  4. Writer的切分
  5. 合并配置 分
  6. 配任务

三、切分任务

JobContainer 的split负责将整个job切分成多个task,生成task配置的列表。

  /** * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果, 达到切分后数目相等, * 才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起。然后,为避免顺序给读写端带来长尾影响, * 将整合的结果shuffler掉 <br/> * 1 动态调整并获取channel数量 <br/> * 2 根据1的channel数量 切割reader 得到reader的cfg列表 <br/> * 3 根据2的 cfg数量切割writer,得到writer cfg 列表 <br/> * 4 获取transform的cfg 列表 <br/> * 5 合并234的cfg <br/> */
  private int split() { 
   
    this.adjustChannelNumber();
    needChannelNumber = needChannelNumber <= 0 ? 1 : needChannelNumber;

    List<Configuration> readerTaskCfgs = this.doReaderSplit(this.needChannelNumber);
    int taskNumber = readerTaskCfgs.size();
    List<Configuration> writerTaskCfs = this.doWriterSplit(taskNumber);

    List<Configuration> trsfms = configuration.getListConfiguration(DATAX_JOB_CONTENT_TRANSFORMER);

    LOG.debug("transformer configuration: " + JSON.toJSONString(trsfms));
    //输入是reader和writer的parameter list,输出是content下面元素的list
    List<Configuration> contentCfgs = mergeReaderAndWriterTaskConfigs(readerTaskCfgs, writerTaskCfs,
        trsfms);

    LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentCfgs));
    this.configuration.set(DATAX_JOB_CONTENT, contentCfgs);
    return contentCfgs.size();
  }

执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型.


四、channel数目的确定

datax先从core.json 和 job.json 里获取用户指定的channel,然后再内部根据实际情况进行调整channel数量;

1、用户指定channel数:

{ 
   
    "core": { 
   
       "transport" : { 
   
          "channel": { 
   
             "speed": { 
   
                "record": 100,
                "byte": 100
             }
          }
       }
    },
    "job": { 
   
      "setting": { 
   
        "speed": { 
   
          "record": 500,
          "byte": 1000,
          "channel" : 1
        }
      }
    }
}

job里的是全局配置, core里的channel是单个channel的限制。首先计算按照字节数限速,channel的数目应该为 500 / 100 = 5,然后按照记录数限速, channel的数目应该为 1000 / 100 = 10, 最后返回两者的最小值 5。虽然指定了channel为1, 但只有在没有限速的条件下,才会使用。

2、程序根据实际情况调整channel数

所有调整channel的代码在JobContainer的adjustChannelNumber方法

  /** * 根据byteNum和RecordNum调整channel数量 <br> * 1 是否有全局(job) byte限制,如果有,则必须要有channel的byte设置,最后计算出 需要的channelByByte数量 <br> * 2 是否有全局(job) record限制,如果有,则必须要有channel的record设置,最后计算出 需要的channelByRecord数量 <br> * 3 取1和2的最小值设置到job的channelNumber,如果可以设置,则该方法任务完成,退出 <br> * 4 如果3 未能设置,则从cfg中判断用户是否自己设置了channelNum,如果用户设置了,将用户设置的给本job channel <br> */
private void adjustChannelNumber() { 

int needChannelNumByByte = Integer.MAX_VALUE;
boolean hasByteLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (hasByteLimit) { 

long jobByteSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelByteSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelByteSpeed == null || channelByteSpeed <= 0) { 

throw DataXException.asDataXException(
CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
needChannelNumByByte = (int) (jobByteSpeed / channelByteSpeed);
needChannelNumByByte = needChannelNumByByte > 0 ? needChannelNumByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + jobByteSpeed + " bytes.");
}
int needChannelNumByRecord = Integer.MAX_VALUE;
boolean hasRecordLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (hasRecordLimit) { 

long jobRecordSpeed = configuration.getInt(DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelRecordSpeed = configuration.getLong(DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelRecordSpeed == null || channelRecordSpeed <= 0) { 

throw DataXException.asDataXException(CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}
needChannelNumByRecord = (int) (jobRecordSpeed / channelRecordSpeed);
needChannelNumByRecord = needChannelNumByRecord > 0 ? needChannelNumByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + jobRecordSpeed + " records.");
}
// 全局的 needChannelNumber 按照needChannelNumByByte 和needChannelNumByRecord 取较小值
needChannelNumber = Math.min(needChannelNumByByte, needChannelNumByRecord);
// 如果从byte或record上设置了needChannelNumber则退出
if (this.needChannelNumber < Integer.MAX_VALUE) { 

return;
}
boolean hasChannelLimit = (configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (hasChannelLimit) { 

needChannelNumber = this.configuration.getInt(DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");
return;
}
throw DataXException.asDataXException(CONFIG_ERROR, "Job运行速度必须设置");
}

五、reader的切分

doReaderSplit方法, 调用Reader.Job的split方法,返回Reader.Task的Configuration列表

/** * adviceNumber, 建议的数目 */
private List<Configuration> doReaderSplit(int adviceNumber) { 

// 切换ClassLoader
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName));
// 调用Job.Reader的split切分
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) { 

throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"reader切分的task数目不能小于等于0");
}
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.",
this.readerPluginName, readerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
}
public List<Configuration> split(int adviceNumber) { 

LOG.info("split() begin...");
List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
// warn:每个slice拖且仅拖一个文件,
// int splitNumber = adviceNumber;
int splitNumber = this.sourceFiles.size();
if (0 == splitNumber) { 

// throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
// String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH)));
String message = String.format("未能找到待读取的文件,请确认您的配置项path: %s", this.readerOriginConfig.getString(Key.PATH));
LOG.info(message);
return new ArrayList<Configuration>();
}
List<List<String>> splitedSourceFiles = this.splitSourceFiles(new ArrayList<String>(this.sourceFiles), splitNumber);
for (List<String> files : splitedSourceFiles) { 

Configuration splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(Constant.SOURCE_FILES, files);
readerSplitConfigs.add(splitedConfig);
}
return readerSplitConfigs;
}
private <T> List<List<T>> splitSourceFiles(final List<T> sourceList, int adviceNumber) { 

List<List<T>> splitedList = new ArrayList<List<T>>();
int averageLength = sourceList.size() / adviceNumber;
averageLength = averageLength == 0 ? 1 : averageLength;
for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) { 

end = begin + averageLength;
if (end > sourceList.size()) { 

end = sourceList.size();
}
splitedList.add(sourceList.subList(begin, end));
}
return splitedList;
}

这里的reader是hdfs reader 原文件数,reader task数等于文件数。


六、Writer的切分数

doWriterSplit方法, 调用Writer.JOb的split方法,返回Writer.Task的Configuration列表

private List<Configuration> doWriterSplit(int readerTaskNumber) { 

// 切换ClassLoader
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
// 调用Job.Reader的split切分
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) { 

throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"writer切分的task不能小于等于0");
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
}

为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
比如rediswriter

 public List<Configuration> split(int mandatoryNumber) { 

List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) { 

configurations.add(getPluginJobConf());
}
return configurations;
}

七、合并配置

合并reader,writer,transformer配置列表。并将任务列表,保存在配置job.content的值里。

private List<Configuration> mergeReaderAndWriterTaskConfigs(
List<Configuration> readerTasksConfigs,
List<Configuration> writerTasksConfigs,
List<Configuration> transformerConfigs) { 

// reader切分的任务数目必须等于writer切分的任务数目
if (readerTasksConfigs.size() != writerTasksConfigs.size()) { 

throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
readerTasksConfigs.size(), writerTasksConfigs.size())
);
}
List<Configuration> contentConfigs = new ArrayList<Configuration>();
for (int i = 0; i < readerTasksConfigs.size(); i++) { 

Configuration taskConfig = Configuration.newDefault();
// 保存reader相关配置
taskConfig.set(CoreConstant.JOB_READER_NAME,
this.readerPluginName);
taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
readerTasksConfigs.get(i));
// 保存writer相关配置
taskConfig.set(CoreConstant.JOB_WRITER_NAME,
this.writerPluginName);
taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
writerTasksConfigs.get(i));
// 保存transformer相关配置
if(transformerConfigs!=null && transformerConfigs.size()>0){ 

taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
}
taskConfig.set(CoreConstant.TASK_ID, i);
contentConfigs.add(taskConfig);
}
return contentConfigs;
}

分配任务
分配算法

首先根据指定的channel数目和每个Taskgroup的拥有channel数目,计算出Taskgroup的数目
根据每个任务的reader.parameter.loadBalanceResourceMark将任务分组
根据每个任务writer.parameter.loadBalanceResourceMark来讲任务分组
根据上面两个任务分组的组数,挑选出大的那个组
轮询上面步骤的任务组,依次轮询的向各个TaskGroup添加一个,直到所有任务都被分配完
这里举个实例:
目前有7个task,channel有20个,每个Taskgroup拥有5个channel。
首先计算出Taskgroup的数目, 20 / 5 = 4 。(实际不会有这种情况,channel数不会超过task数)

根据reader.parameter.loadBalanceResourceMark,将任务分组如下:

{ 

"database_a" : [task_id_1, task_id_2],
"database_b" : [task_id_3, task_id_4, task_id_5],
"database_c" : [task_id_6, task_id_7]
}

根据writer.parameter.loadBalanceResourceMark,将任务分组如下:

{ 

"database_dst_d" : [task_id_1, task_id_2],
"database_dst_e" : [task_id_3, task_id_4, task_id_5, task_id_6, task_id_7]
}

因为readerResourceMarkAndTaskIdMap有三个组,而writerResourceMarkAndTaskIdMap只有两个组。从中选出组数最多的,所以这里按照readerResourceMarkAndTaskIdMap将任务分配。

执行过程是,轮询database_a, database_b, database_c,取出第一个。循环上一步

1. 取出task_id_1 放入 taskGroup_1
2. 取出task_id_3 放入 taskGroup_2
3. 取出task_id_6 放入 taskGroup_3
4. 取出task_id_2 放入 taskGroup_4
5. 取出task_id_4 放入 taskGroup_1
6. ………

最后返回的结果为

{ 

"taskGroup_1": [task_id_1, task_id_4],
"taskGroup_2": [task_id_3, task_id_7],
"taskGroup_3": [task_id_6, task_id_5],
"taskGroup_4": [task_id_2]
}

代码解释
任务的分配是由JobAssignUtil类负责。使用者调用assignFairly方法,传入参数,返回TaskGroup配置列表

public final class JobAssignUtil { 

/** * configuration 配置 * channelNumber, channel总数 * channelsPerTaskGroup, 每个TaskGroup拥有的channel数目 */
public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) { 

List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
// 计算TaskGroup的数目
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
......
// 任务分组
LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
// 调用doAssign方法,分配任务
List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
// 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)
adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
return taskGroupConfig;
}
}

任务分组
按照task配置的reader.parameter.loadBalanceResourceMark和writer.parameter.loadBalanceResourceMark,分别对任务进行分组,选择分组数最高的那组,作为任务分组的源。

/** * contentConfig参数,task的配置列表 */
private static LinkedHashMap<String, List<Integer>> parseAndGetResourceMarkAndTaskIdMap(List<Configuration> contentConfig) { 

// reader的任务分组,key为分组的名称,value是taskId的列表
LinkedHashMap<String, List<Integer>> readerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
// writer的任务分组,key为分组的名称,value是taskId的列表
LinkedHashMap<String, List<Integer>> 
writerResourceMarkAndTaskIdMap = new LinkedHashMap<String, List<Integer>>();
for (Configuration aTaskConfig : contentConfig) { 

int taskId = aTaskConfig.getInt(CoreConstant.TASK_ID);
// 取出reader.parameter.loadBalanceResourceMark的值,作为分组名
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
if (readerResourceMarkAndTaskIdMap.get(readerResourceMark) == null) { 

readerResourceMarkAndTaskIdMap.put(readerResourceMark, new LinkedList<Integer>());
}
// 把 readerResourceMark 加到 readerResourceMarkAndTaskIdMap 中
readerResourceMarkAndTaskIdMap.get(readerResourceMark).add(taskId);
// 取出writer.parameter.loadBalanceResourceMark的值,作为分组名
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." + CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
if (writerResourceMarkAndTaskIdMap.get(writerResourceMark) == null) { 

writerResourceMarkAndTaskIdMap.put(writerResourceMark, new LinkedList<Integer>());
}
// 把 writerResourceMark 加到 writerResourceMarkAndTaskIdMap 中
writerResourceMarkAndTaskIdMap.get(writerResourceMark).add(taskId);
}
// 选出reader和writer其中最大的
if (readerResourceMarkAndTaskIdMap.size() >= writerResourceMarkAndTaskIdMap.size()) { 

// 采用 reader 对资源做的标记进行 shuffle
return readerResourceMarkAndTaskIdMap;
} else { 

// 采用 writer 对资源做的标记进行 shuffle
return writerResourceMarkAndTaskIdMap;
}
}

分配任务
将上一部任务的分组,划分到每个taskGroup里

private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) { 

List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
Configuration taskGroupTemplate = jobConfiguration.clone();
taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);
List<Configuration> result = new LinkedList<Configuration>();
// 初始化taskGroupConfigList
List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
for (int i = 0; i < taskGroupNumber; i++) { 

taskGroupConfigList.add(new LinkedList<Configuration>());
}
// 取得resourceMarkAndTaskIdMap的值的最大个数
int mapValueMaxLength = -1;
List<String> resourceMarks = new ArrayList<String>();
for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) { 

resourceMarks.add(entry.getKey());
if (entry.getValue().size() > mapValueMaxLength) { 

mapValueMaxLength = entry.getValue().size();
}
}
int taskGroupIndex = 0;
// 执行mapValueMaxLength次数,每一次轮询一遍resourceMarkAndTaskIdMap
for (int i = 0; i < mapValueMaxLength; i++) { 

// 轮询resourceMarkAndTaskIdMap
for (String resourceMark : resourceMarks) { 

if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) { 

// 取出第一个
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
// 轮询的向taskGroupConfigList插入值
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
// taskGroupIndex自增
taskGroupIndex++;
// 删除第一个
resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}
Configuration tempTaskGroupConfig;
for (int i = 0; i < taskGroupNumber; i++) { 

tempTaskGroupConfig = taskGroupTemplate.clone();
// 设置TaskGroup的配置
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
result.add(tempTaskGroupConfig);
}
// 返回结果
return result;
}

为组分配channel
上面已经把任务划分成多个组,为了每个组能够均匀的分配channel,还需要调整。算法原理是,当channel总的数目,不能整除TaskGroup的数目时。多的余数个channel,从中挑选出余数个TaskGroup,每个多分配一个。

比如现在有13个channel,然后taskgroup确有5个。那么首先每个组先分 13 / 5 = 2 个。那么还剩下多的3个chanel,分配给前面个taskgroup。

private static void adjustChannelNumPerTaskGroup(List<Configuration> taskGroupConfig, int channelNumber) { 

int taskGroupNumber = taskGroupConfig.size();
int avgChannelsPerTaskGroup = channelNumber / taskGroupNumber;
int remainderChannelCount = channelNumber % taskGroupNumber;
// 表示有 remainderChannelCount 个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup + 1;
// (taskGroupNumber - remainderChannelCount)个 taskGroup,其对应 Channel 个数应该为:avgChannelsPerTaskGroup
int i = 0;
for (; i < remainderChannelCount; i++) { 

taskGroupConfig.get(i).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup + 1);
}
for (int j = 0; j < taskGroupNumber - remainderChannelCount; j++) { 

taskGroupConfig.get(i + j).set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, avgChannelsPerTaskGroup);
}
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145215.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • viewstate解密

    viewstate解密看完之后,觉得能不用viewstate就不用,再者像这样viewstate[“a”]=”b”;这种简单的赋值是没有什么关系的,它生成的树是很小的,altas一定是用js修改了viewstate的,但方法肯定是加密再加密的,效率也应该很低. ViewState是.Net中提出的状态保存的一种新途径(实际上也是老瓶装新酒);我们知道,传统的Web程序保存状态的方式有这样几种: 1、Appli

  • JSP的四种作用域与九大内置对象

    JSP的四种作用域与九大内置对象JSP的四种作用域与九大内置对象

  • c++ STL_鱼c

    c++ STL_鱼c学校并未教授C++,当初接触的C++的STL,也是皮毛而已。结合对Java的集合框架等内容的认识,回顾这部分内容,收获很大。文章目录概述STL六大组件简介三大组件介绍1.容器2.算法3.迭代器常用容器1.string容器string容器基本概念string容器常用操作2.vector容器vector容器基本概念vector迭代器vector的数据结构vector常用API操作…

    2022年10月22日
  • android view事件分发机制_android事件分发流程图

    android view事件分发机制_android事件分发流程图PS一句:最终还是选择CSDN来整理发表这几年的知识点,该文章平行迁移到CSDN。因为CSDN也支持MarkDown语法了,牛逼啊!【工匠若水http://blog.csdn.net/yanbober】Notice:阅读完该篇之后如果想继续深入阅读Android触摸屏事件派发机制详解与源码分析下一篇请点击《Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)》查看。1背景最近

  • linux复制文件夹及赋予权限

    1.cp命令命令:cpdir1/a.docdir2表示将dir1下的a.doc文件复制到dir2目录下cp-rdir1dir2表示将dir1及其dir1下所包含的文件复制到dir2下cp-rdir1/.dir2表示将dir1下的文件复制到dir2,不包括dir1目录说明:cp参数-i:询问,如果目标文件已经存在,则会询问是否覆盖;2.scp命令例如:scpid_rsa.pubrouter_17@IP:/home/router_17/.ssh/authori…

  • densenet详解_resnet详解

    densenet详解_resnet详解本文主要介绍近几年效果比较好的DenseNet网络并代码实现。

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号