hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

大家好,又见面了,我是你们的朋友全栈君。

1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?

这里其实不要有过的纠结,这里的块是block,是hdfs中切块的大小,属于物理划分,默认64M,在hadoop-default.xml配置中有体现:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

当然如果文件没有64M也不会占据整块空间。

将文件分割成多个块后,形成一个数据队列,然后依次写入datanode列表。

再者,如果写入的是个文件夹,而且每个文件的都不大,这样在hdfs中是默认每个文件一个块的,即使没有64m,当然也可做优化处理,不过hbase更便利于处理把小文件合并到一个块中,这个我会在其他博文中介绍。

2,下面我们说说split,并与block的关系

首先,split是mapreduce中的概念,而block是hdfs中切块的大小。

如下:

//设置要处理的文本数据所存放的路径  
        FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt");  
        FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));  

我们设置要处理的文件路径时都会用到fileInputFormat类, 不过我们更多看到的是inputFormat,其实fileInputFormat这个类的也是实现inputFomat接口,

下面我们接着看源码,说明为什么需要分片?

以hadoop自带的wordCount的源码为例:

for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);  

我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:

public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不关心的代码  
  return isSuccessful();  
}  

这里的submit方法的实现如下:

public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不关心的代码</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }  

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:

// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps); 

writeSplits方法的实现如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}  

由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:

private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}  

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}  

totalSize:是整个Map-Reduce job所有输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。

splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的

 

  一个片为一个splits,即一个map,只要搞清楚片的大小,就能计算出运行时的map数。而一个split的大小是由goalSize, minSize, blockSize这三个值决定的。computeSplitSize的逻辑是,先从goalSize和blockSize两个值中选出最小的那个(比如一般不设置map数,这时blockSize为当前文件的块size,而goalSize是文件大小除以用户设置的map数得到的,如果没设置的话,默认是1),在默认的大多数情况下,blockSize比较小。然后再取blockSize和minSize中最大的那个。而minSize如果不通过”mapred.min.split.size”设置的话(”mapred.min.split.size”默认为0),minSize为1,可理解为一个block块,这样得出的一个splits的size就是blockSize,即一个块一个map,有多少块就有多少map。

split的大小时默认和hdfs的block块大小一致,但是可以通过配置文件自己设置: 
其中有俩个配置文件(如下):

--minsize   默认大小为1
mapreduce.input.fileinputformat.split.minsize  

--maxsize   默认大小为Long.MAXValue 
mapreduce.input.fileinputformat.split.maxsize

举例:

比如说我问写入一个文件夹,里面有10个只有几k的文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为10。在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:

protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

然后根据

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  

很明显取split默认值,也就是一个块,那么10个就要分为10块,这也说明为什么处理小文件时,block的大小小于split的 大小。同时我们看到了一个split分配一个map任务。

这里我们可以总结下split大小与block的关系:

(1)block块的小于split分片的最小值,那split的值就是split分片的大小

(2)block块的小大介于split分片配置的最小值和最大值之间,block的大小就是split的大小。

(3)block块的大小大于split分片的最大值,split的大小就是split配置的最大值。但会增加map执行的并发度,但是会造成在节点之间拉取数据

也有公式可以计算split也就是map任务数,这里就不做讨论了。

一个map对应一个split分片吗?

经过上面的讨论,答案是显而易见的:

map个数:由任务切片spilt决定的,默认情况下一个split的大小就是block
由参与任务的文件个数决定的 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/106130.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 起步上路「建议收藏」

    起步上路「建议收藏」始于2016-09-11开篇,中间基于工作的内容,记录了几篇相关内容的博客,离上一次发博客(2017-07-24),也过了1年多的时间。这个期间,开始转战人工智能方向,深度学习算法的内容。几经磨难,算是踏入门槛,期间也记录了些许内容。期间也看了各路朋友相关的博客内容,其中不乏精彩之作,为初学者提供了很好的帮助。目前接触和比较熟悉的是目标检测相关的内容,大多基于caffe深度学习框架,博客内容不…

  • 如何打开rdb文件

    如何打开rdb文件

  • win732位系统怎么安装_windows7可以安装python 什么版本

    win732位系统怎么安装_windows7可以安装python 什么版本win732位系统如何安装pycharm?1.查找安装说明百度找到了PyCharm安装教程(Windows),地址是:https://www.runoob.com/w3cnote/pycharm-windows-install.html按照步骤选择了community社区版的pycharm进行下载安装安装过程中出现如下提示信息:提示信息显示安装pycharm2019.3.1版本…

  • mysql数据库学习笔记(一)

    mysql数据库学习笔记(一)

  • 普通索引与唯一索引的区别_唯一索引怎么设置

    普通索引与唯一索引的区别_唯一索引怎么设置所谓普通索引,就是在创建索引时,不附加任何限制条件(唯一、非空等限制)。该类型的索引可以创建在任何数据类型的字段上。所谓唯一索引,就是在创建索引时,限制索引的值必须是唯一的。通过该类型的索引可以更快速地查询某条记录。普通索引还是唯一索引?假设你在维护一个市民系统,每个人都有一个唯一的身份证号,而且业务代码已经保证了不会写入两个重复的身份证号。如果市民系统需要按照身份证号查姓名,就会…

  • Hadoop体系_集团架构

    Hadoop体系_集团架构目录2.1Hadoop简介2.1.1Hadoop由来2.1.2Hadoop发展历程2.1.3Hadoop生态系统2.2Hadoop的体系架构2.2.1分布式文件系统HDFS2.2.2分布式计算框架MapReduce2.2.3分布式资源调度系统YARN2.2.4三大发行版本2.1Hadoop简介自从大数据的概念被提出后,出现了很多相关技术,其中对大数据发展最有影响力的就是开源分布式计算平台Hadoop,它就像软件发展史上的Win…

    2022年10月17日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号