MapReduce InputFormat之FileInputFormat

一:简单认识InputFormat类InputFormat主要用于描述输入数据的格式,提供了以下两个功能:1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定MapTask的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个MapTask;2)为Mapper提供输入数据,即给定一个spli…

大家好,又见面了,我是你们的朋友全栈君。

一:简单认识InputFormat类

InputFormat主要用于描述输入数据的格式,提供了以下两个功能: 

        1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task; 

 

        2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。

 

下面我们先来看以下1.0版本中的老的InputFormat接口:

Java代码 

  1. public interface InputFormat<K,V>{  
  2.      
  3.    //获取所有的split分片     
  4.    public InputSplit[] getSplits(JobConf job,int numSplits) throws IOException;   
  5.     
  6.    //获取读取split的RecordReader对象,实际上是由RecordReader对象将  
  7.    //split解析成一个个的key/value对儿  
  8.    public RecordReader<K,V> getRecordReader(InputSplit split,  
  9.                                JobConf job,  
  10.                                Reporter reporter) throws IOException;   
  11. }  

InputSplit 
        getSplit(…)方法主要用于切分数据,它会尝试浙江输入数据且分成numSplits个InputSplit的栓皮栎split分片。InputSplit主要有以下特点: 
        1)、逻辑分片,之前我们已经学习过split和block的对应关系和区别,split只是在逻辑上对数据分片,并不会在磁盘上讲数据切分成split物理分片,实际上数据在HDFS上还是以block为基本单位来存储数据的。InputSplit只记录了Mapper要处理的数据的元数据信息,如起始位置、长度和所在的节点; 

 

 

 

MapReduce InputFormat之FileInputFormat

  2)、可序列化,在Hadoop中,序列化主要起两个作用,进程间通信和数据持久化存储。在这里,InputSplit主要用于进程间的通信。 
         在作业被提交到JobTracker之前,Client会先调用作业InputSplit中的getSplit()方法,并将得到的分片信息序列化到文件中,这样,在作业在JobTracker端初始化时,便可并解析出所有split分片,创建对象应的Map Task。 
         InputSplit也是一个interface,具体返回什么样的implement,这是由具体的InputFormat来决定的。InputSplit也只有两个接口函数:

Java代码 

  1. public interface InputSplit extends Writable {  
  2.   
  3.   /** 
  4.    * 获取split分片的长度 
  5.    *  
  6.    * @return the number of bytes in the input split. 
  7.    * @throws IOException 
  8.    */  
  9.   long getLength() throws IOException;  
  10.     
  11.   /** 
  12.    * 获取存放这个Split的Location信息(也就是这个Split在HDFS上存放的机器。它可能有 
  13.    * 多个replication,存在于多台机器上 
  14.    *  
  15.    * @return list of hostnames where data of the <code>InputSplit</code> is 
  16.    *         located as an array of <code>String</code>s. 
  17.    * @throws IOException 
  18.    */  
  19.   String[] getLocations() throws IOException;  
  20. }  

 在需要读取一个Split的时候,其对应的InputSplit会被传递到InputFormat的第二个接口函数getRecordReader,然后被用于初始化一个RecordReader,以便解析输入数据,描述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道,InputFormat只需要保证getSplits返回的InputSplit和getRecordReader所关心的InputSplit是同样的implement就行了,这给InputFormat的实现提供了巨大的灵活性。 
         在MapReduce框架中最常用的FileInputFormat为例,其内部使用的就是FileSplit来描述InputSplit。我们来看一下FileSplit的一些定义信息:

Java代码  

  1. /** A section of an input file.  Returned by {@link 
  2.  * InputFormat#getSplits(JobConf, int)} and passed to 
  3.  * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}.  
  4.  */  
  5. public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit   
  6.                        implements InputSplit {  
  7.   // Split所在的文件  
  8.   private Path file;  
  9.   // Split的起始位置  
  10.   private long start;  
  11.   // Split的长度  
  12.   private long length;  
  13.   // Split所在的机器名称  
  14.   private String[] hosts;  
  15.     
  16.   FileSplit() {}  
  17.   
  18.   /** Constructs a split. 
  19.    * @deprecated 
  20.    * @param file the file name 
  21.    * @param start the position of the first byte in the file to process 
  22.    * @param length the number of bytes in the file to process 
  23.    */  
  24.   @Deprecated  
  25.   public FileSplit(Path file, long start, long length, JobConf conf) {  
  26.     this(file, start, length, (String[])null);  
  27.   }  
  28.   
  29.   /** Constructs a split with host information 
  30.    * 
  31.    * @param file the file name 
  32.    * @param start the position of the first byte in the file to process 
  33.    * @param length the number of bytes in the file to process 
  34.    * @param hosts the list of hosts containing the block, possibly null 
  35.    */  
  36.   public FileSplit(Path file, long start, long length, String[] hosts) {  
  37.     this.file = file;  
  38.     this.start = start;  
  39.     this.length = length;  
  40.     this.hosts = hosts;  
  41.   }  
  42.   
  43.   /** The file containing this split’s data. */  
  44.   public Path getPath() { return file; }  
  45.     
  46.   /** The position of the first byte in the file to process. */  
  47.   public long getStart() { return start; }  
  48.     
  49.   /** The number of bytes in the file to process. */  
  50.   public long getLength() { return length; }  
  51.   
  52.   public String toString() { return file + “:” + start + “+” + length; }  
  53.   
  54.     
  55.   // Writable methods  
  56.     
  57.   
  58.   public void write(DataOutput out) throws IOException {  
  59.     UTF8.writeString(out, file.toString());  
  60.     out.writeLong(start);  
  61.     out.writeLong(length);  
  62.   }  
  63.   public void readFields(DataInput in) throws IOException {  
  64.     file = new Path(UTF8.readString(in));  
  65.     start = in.readLong();  
  66.     length = in.readLong();  
  67.     hosts = null;  
  68.   }  
  69.   
  70.   public String[] getLocations() throws IOException {  
  71.     if (this.hosts == null) {  
  72.       return new String[]{};  
  73.     } else {  
  74.       return this.hosts;  
  75.     }  
  76.   }  
  77.     
  78. }  

         从上面的代码中我们可以看到,FileSplit就是InputSplit接口的一个实现。InputFormat使用的RecordReader将从FileSplit中获取信息,解析FileSplit对象从而获得需要的数据的起始位置、长度和节点位置。 

 

 

 

  RecordReader 
         对于getRecordReader(…)方法,它返回一个RecordReader对象,该对象可以讲输入的split分片解析成一个个的key/value对儿。在Map Task的执行过程中,会不停的调用RecordReader对象的方法,迭代获取key/value并交给map()方法处理:

Java代码 

  1. //调用InputFormat的getRecordReader()获取RecordReader<K,V>对象,  
  2. //并由RecordReader对象解析其中的input(split)…  
  3. K1 key = input.createKey();  
  4. V1 value = input.createValue();  
  5. while(input.next(key,value)){
    //从input读取下一个key/value对  
  6.     //调用用户编写的map()方法  
  7. }  
  8. input.close();  

         RecordReader主要有两个功能: 
         ●定位记录的边界:由于FileInputFormat是按照数据量对文件进行切分,因而有可能会将一条完整的记录切成2部分,分别属于两个split分片,为了解决跨InputSplit分片读取数据的问题,RecordReader规定每个分片的第一条不完整的记录划给前一个分片处理。 
         ●解析key/value:定位一条新的记录,将记录分解成key和value两部分供Mapper处理。 

 

 

 

InputFormat 
         MapReduce自带了一些InputFormat的实现类: 

MapReduce InputFormat之FileInputFormat

 下面我们看几个有代表性的InputFormat: 
         FileInputFormat 
         FileInputFormat是一个抽象类,它最重要的功能是为各种InputFormat提供统一的getSplits()方法,该方法最核心的是文件切分算法和Host选择算法:

Java代码 

  1. /** Splits files returned by {@link #listStatus(JobConf)} when 
  2.    * they’re too big.*/   
  3. @SuppressWarnings(“deprecation”)  
  4. public InputSplit[] getSplits(JobConf job, int numSplits)  
  5.     throws IOException {  
  6.     FileStatus[] files = listStatus(job);  
  7.       
  8.     // Save the number of input files in the job-conf  
  9.     job.setLong(NUM_INPUT_FILES, files.length);  
  10.     long totalSize = 0;                           // compute total size  
  11.     for (FileStatus file: files) {                // check we have valid files  
  12.       if (file.isDir()) {  
  13.         throw new IOException(“Not a file: “+ file.getPath());  
  14.       }  
  15.       totalSize += file.getLen();  
  16.     }  
  17.       
  18.     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
  19.     long minSize = Math.max(job.getLong(“mapred.min.split.size”1),  
  20.                             minSplitSize);  
  21.   
  22.     // 定义要生成的splits(FileSplit)的集合  
  23.     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);  
  24.     NetworkTopology clusterMap = new NetworkTopology();  
  25.     for (FileStatus file: files) {  
  26.       Path path = file.getPath();  
  27.       FileSystem fs = path.getFileSystem(job);  
  28.       long length = file.getLen();  
  29.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  
  30.       if ((length != 0) && isSplitable(fs, path)) {   
  31.         long blockSize = file.getBlockSize();  
  32.         //获取最终的split分片的大小,该值很可能和blockSize不相等  
  33.         long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
  34.   
  35.         long bytesRemaining = length;  
  36.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  37.           //获取split分片所在的host的节点信息  
  38.           String[] splitHosts = getSplitHosts(blkLocations,   
  39.               length-bytesRemaining, splitSize, clusterMap);  
  40.           //最终生成所有分片  
  41.           splits.add(new FileSplit(path, length-bytesRemaining, splitSize,   
  42.               splitHosts));  
  43.           bytesRemaining -= splitSize;  
  44.         }  
  45.           
  46.         if (bytesRemaining != 0) {  
  47.           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
  48.                      blkLocations[blkLocations.length-1].getHosts()));  
  49.         }  
  50.       } else if (length != 0) {  
  51.         //获取split分片所在的host的节点信息  
  52.         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);  
  53.         //最终生成所有分片  
  54.         splits.add(new FileSplit(path, 0, length, splitHosts));  
  55.       } else {   
  56.         //Create empty hosts array for zero length files  
  57.         //最终生成所有分片  
  58.         splits.add(new FileSplit(path, 0, length, new String[0]));  
  59.       }  
  60.     }  
  61.     LOG.debug(“Total # of splits: “ + splits.size());  
  62.     return splits.toArray(new FileSplit[splits.size()]);  
  63. }  

          1)、文件切分算法 
          文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段,FileInputSplit以文件为单位切分生成InputSplit。有三个属性值来确定InputSplit的个数: 
          ●goalSize:该值由totalSize/numSplits来确定InputSplit的长度,它是根据用户的期望的InputSplit个数计算出来的;numSplits为用户设定的Map Task的个数,默认为1。 
          ●minSize:由配置参数mapred.min.split.size决定的InputFormat的最小长度,默认为1。 
          ●blockSize:HDFS中的文件存储块block的大小,默认为64MB。 
          这三个参数决定一个InputFormat分片的最终的长度,计算方法如下: 
                      splitSize = max{minSize,min{goalSize,blockSize}} 
计算出了分片的长度后,也就确定了InputFormat的数目。 

          2)、host选择算法 
          InputFormat的切分方案确定后,接下来就是要确定每一个InputSplit的元数据信息。InputSplit元数据通常包括四部分,<file,start,length,hosts>其意义为: 
          ●file标识InputSplit分片所在的文件; 
          ●InputSplit分片在文件中的的起始位置; 
          ●InputSplit分片的长度; 
          ●分片所在的host节点的列表。 
          InputSplit的host列表的算作策略直接影响到运行作业的本地性。我们知道,由于大文件存储在HDFS上的block可能会遍布整个Hadoop集群,而一个InputSplit分片的划分算法可能会导致一个split分片对应多个不在同一个节点上的blocks,这就会使得在Map Task执行过程中会涉及到读其他节点上的属于该Task的block中的数据,从而不能实现数据本地性,而造成更多的网络传输开销。 
          一个InputSplit分片对应的blocks可能位于多个数据节点地上,但是基于任务调度的效率,通常情况下,不会把一个分片涉及的所有的节点信息都加到其host列表中,而是选择包含该分片的数据总量的最大的前几个节点,作为任务调度时判断是否具有本地性的主要凭证。 
         FileInputFormat使用了一个启发式的host选择算法:首先按照rack机架包含的数据量对rack排序,然后再在rack内部按照每个node节点包含的数据量对node排序,最后选取前N个(N为block的副本数)node的host作为InputSplit分片的host列表。当任务地调度Task作业时,只要将Task调度给host列表上的节点,就可以认为该Task满足了本地性。 
         从上面的信息我们可以知道,当InputSplit分片的大小大于block的大小时,Map Task并不能完全满足数据的本地性,总有一本分的数据要通过网络从远程节点上读数据,故为了提高Map Task的数据本地性,减少网络传输的开销,应尽量是InputFormat的大小和HDFS的block块大小相同。 

          TextInputFormat 
          默认情况下,MapReduce使用的是TextInputFormat来读分片并将记录数据解析成一个个的key/value对,其中key为该行在整个文件(注意而不是在一个block)中的偏移量,而行的内容即为value。 
          CombineFileInputFormat 
          CombineFileInputFormat的作用是把许多文件合并作为一个map的输入,它的主要思路是把输入目录下的大文件分成多个map的输入, 并合并小文件, 做为一个map的输入。适合在处理多个小文件的场景。 
          SequenceFileInputFormat 
          SequenceFileInputFormat是一个顺序的二进制的FileInputFormat,内部以key/value的格式保存数据,通常会结合LZO或Snappy压缩算法来读取或保存可分片的数据文件。


搜索与推荐Wiki

扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!


                             【技术服务】,详情点击查看:https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg 


外包服务

MapReduce InputFormat之FileInputFormat

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/126121.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • wpf-AvalonDock基础-安装和更换主题

    wpf-AvalonDock基础-安装和更换主题最近对wpf的多窗口排列问题深感头疼,算尺寸、位置太麻烦了(也可能是我菜鸡的缘故),最后决定用AvalonDock,排列很漂亮。本篇主要是安装和更换主题,后续会更一篇项目中常用的技巧。再吐槽一下,AvalonDock的中文资料同质化太严重!!!很多需要自己测试了才能用好(我的环境是win10+vs2019)喜欢的话为我的辛苦点个赞吧!嘤嘤嘤安装Avalondock是一个支持mvvm的框架,可以快速开发出类似visualstudio的多窗口app。去https://archive.codepl

  • echarts旭日图数据重构处理

    echarts旭日图数据重构处理网上对于旭日图的数据结构处理资料很少,所以自己记录一下。首先看旭日图需要的数据结构://旭日图{name:’淘宝’,children:[{name:’女装’,children:[{name:’上衣’,value:22},{name:’裙子’,value:12},

  • 如何解决虚拟机连不上网「建议收藏」

    如何解决虚拟机连不上网「建议收藏」通常情况下,电脑关机或重启后需要重新连网,但是,虚拟机下的乌班图通常需要重新连网,很多时候找不到之前连接的网络,如果是宽带连接,首先查看虚拟机的设置,将网络适配器改成Net模式(必要时需要重置,然后重启虚拟机),如果还没有出现要连接的以太网,那么就要查看一下主机的服务中的虚拟机是否已经全部开启,如果没有开启,就要将所有和虚拟机有关的服务启动。…

  • phpstorm2021.12激活【最新永久激活】

    (phpstorm2021.12激活)这是一篇idea技术相关文章,由全栈君为大家提供,主要知识点是关于2021JetBrains全家桶永久激活码的内容IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html0BXA05X8YC-eyJsa…

  • java.lang.Class类详解

    java.lang.Class类详解1.Class类与类的关系 Java程序运行时,系统一直对所有的对象进行所谓的运行时类型标识。这项信息纪录了每个对象所属的类。虚拟机通常使用运行时类型信息选准正确方法去执行,用来保存这些类型信息的类是Class类。Class类封装一个对象和接口运行时的状态,当装载类时,Class类型的对象自动创建。说白了,Class类对象就是封装了一个类的类型信息,可以通过该对象操作其对应的类,即发射机制。

  • Hadoop-2.2.0中国文献——MapReduce 下一代 —配置单节点集群

    Hadoop-2.2.0中国文献——MapReduce 下一代 —配置单节点集群

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号