hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

大家好,又见面了,我是你们的朋友全栈君。

1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?

这里其实不要有过的纠结,这里的块是block,是hdfs中切块的大小,属于物理划分,默认64M,在hadoop-default.xml配置中有体现:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

当然如果文件没有64M也不会占据整块空间。

将文件分割成多个块后,形成一个数据队列,然后依次写入datanode列表。

再者,如果写入的是个文件夹,而且每个文件的都不大,这样在hdfs中是默认每个文件一个块的,即使没有64m,当然也可做优化处理,不过hbase更便利于处理把小文件合并到一个块中,这个我会在其他博文中介绍。

2,下面我们说说split,并与block的关系

首先,split是mapreduce中的概念,而block是hdfs中切块的大小。

如下:

//设置要处理的文本数据所存放的路径  
        FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt");  
        FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));  

我们设置要处理的文件路径时都会用到fileInputFormat类, 不过我们更多看到的是inputFormat,其实fileInputFormat这个类的也是实现inputFomat接口,

下面我们接着看源码,说明为什么需要分片?

以hadoop自带的wordCount的源码为例:

for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);  

我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:

public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不关心的代码  
  return isSuccessful();  
}  

这里的submit方法的实现如下:

public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不关心的代码</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }  

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:

// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps); 

writeSplits方法的实现如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}  

由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:

private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}  

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}  

totalSize:是整个Map-Reduce job所有输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。

splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的

 

  一个片为一个splits,即一个map,只要搞清楚片的大小,就能计算出运行时的map数。而一个split的大小是由goalSize, minSize, blockSize这三个值决定的。computeSplitSize的逻辑是,先从goalSize和blockSize两个值中选出最小的那个(比如一般不设置map数,这时blockSize为当前文件的块size,而goalSize是文件大小除以用户设置的map数得到的,如果没设置的话,默认是1),在默认的大多数情况下,blockSize比较小。然后再取blockSize和minSize中最大的那个。而minSize如果不通过”mapred.min.split.size”设置的话(”mapred.min.split.size”默认为0),minSize为1,可理解为一个block块,这样得出的一个splits的size就是blockSize,即一个块一个map,有多少块就有多少map。

split的大小时默认和hdfs的block块大小一致,但是可以通过配置文件自己设置: 
其中有俩个配置文件(如下):

--minsize   默认大小为1
mapreduce.input.fileinputformat.split.minsize  

--maxsize   默认大小为Long.MAXValue 
mapreduce.input.fileinputformat.split.maxsize

举例:

比如说我问写入一个文件夹,里面有10个只有几k的文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为10。在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:

protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

然后根据

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  

很明显取split默认值,也就是一个块,那么10个就要分为10块,这也说明为什么处理小文件时,block的大小小于split的 大小。同时我们看到了一个split分配一个map任务。

这里我们可以总结下split大小与block的关系:

(1)block块的小于split分片的最小值,那split的值就是split分片的大小

(2)block块的小大介于split分片配置的最小值和最大值之间,block的大小就是split的大小。

(3)block块的大小大于split分片的最大值,split的大小就是split配置的最大值。但会增加map执行的并发度,但是会造成在节点之间拉取数据

也有公式可以计算split也就是map任务数,这里就不做讨论了。

一个map对应一个split分片吗?

经过上面的讨论,答案是显而易见的:

map个数:由任务切片spilt决定的,默认情况下一个split的大小就是block
由参与任务的文件个数决定的 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/106130.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 一级倒立摆matlab仿真,一级倒立摆的Simulink仿真「建议收藏」

    一级倒立摆matlab仿真,一级倒立摆的Simulink仿真「建议收藏」一级倒立摆的Simulink仿真单级倒立摆稳定控制直线一级倒立摆系统在忽略了空气阻力及各种摩擦之后,可抽象成小车和匀质摆杆组成的系统,如图1所示。mg杆长为2u图1直线一级倒立摆系统图2控制系统结构假设小车质量M=0.5kg,匀质摆杆质量m=0.2kg,摆杆长度2l=0.6m,x(t)为小车的水平位移,θ为摆杆的角位移,。控制的目标是通过外力(t)使得摆直立向上2…

  • Etcd学习(二)集群搭建Clustering

    Etcd学习(二)集群搭建Clustering

  • ideaIU-2019.2.exe

    一、查看安装目录结构   bin: 容器,执行文件和启动参数等 help:快捷键文档和其他帮助文档 jbr: 含有java运行环境 lib:idea 依赖的类库 license:各个插件许可

  • Redis分布式锁的三种实现方式_分布式锁解决方案

    Redis分布式锁的三种实现方式_分布式锁解决方案总结写在前面:RLockrLock=redissonClient.getLock(“lbhTestLock”);使用tryLock无参方法时,redisson会自动添加一个定时任务,定时刷新锁的失效时间,如果unlock时失败,则会出现该锁一直不释放的情况。而当tryLock传释放时间时,则不会添加这个定时任务。测试如下:1、tryLock无参数@Testp…

    2022年10月15日
  • 软件测试工作基本流程[通俗易懂]

    软件测试工作基本流程[通俗易懂]最近在为面试新工作做准备,所以想想整理一下软件测试的基本工作流程,大致梳理一遍,这样也便于自己在面试过程中可以沉着的面对面试官的测试工作如何进行的问题。首先,作为测试人员需要学习并了解业务,分析需求点为什么测试人员要参加需求分析?也就是进行测试需求分析的目的是什么?第一、把用户需求转化为功能需求:1)对测试范围进度量2)对处理分支进行度量3)对需…

  • golang deepcopy_mongodb主从复制原理

    golang deepcopy_mongodb主从复制原理Go语言中所有赋值操作都是值传递,如果结构中不含指针,则直接赋值就是深度拷贝;如果结构中含有指针(包括自定义指针,以及切片,map等使用了指针的内置类型),则数据源和拷贝之间对应指针会共同指向同一块内存,这时深度拷贝需要特别处理。目前,有三种方法,一是用gob序列化成字节序列再反序列化生成克隆对象;二是先转换成json字节序列,再解析字节序列生成克隆对象;三是针对具体情况,定制化拷贝。前两种方法虽……

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号