hadoop多文件格式输入「建议收藏」

hadoop多文件格式输入

大家好,又见面了,我是全栈君。

版本号:

CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)

hadoop多文件格式输入,一般能够使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。

比方如今有例如以下的需求:

现有两份数据:

phone:

123,good number
124,common number
125,bad number

user:

zhangsan,123
lisi,124
wangwu,125

如今须要把user和phone依照phone number连接起来,得到以下的结果:

zhangsan,123,good number
lisi,124,common number
wangwu,125,bad number

那么就能够使用MultipleInputs来操作,这里把user和phone上传到hdfs文件夹中,各自是/multiple/user/user , /multiple/phone/phone。

设计的MultipleDriver例如以下:

package multiple.input;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *  
 * input2(/multiple/phone/phone):
 *  user_phone,description 
 *  
 * output: username,user_phone,description
 * 
 * @author fansy
 *
 */
public class MultipleDriver extends Configured implements Tool{
//	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
	
	private String input1=null;
	private String input2=null;
	private String output=null;
	private String delimiter=null;
	
	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
//		conf.set("fs.defaultFS", "hdfs://node33:8020");  
//        conf.set("mapreduce.framework.name", "yarn");  
//        conf.set("yarn.resourcemanager.address", "node33:8032"); 
        
		ToolRunner.run(conf, new MultipleDriver(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		configureArgs(arg0);
		checkArgs();
		
		Configuration conf= getConf();
		conf.set("delimiter", delimiter);
		 @SuppressWarnings("deprecation")
		Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver.class);

        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        job.setNumReduceTasks(1);
        MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
        MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
        FileOutputFormat.setOutputPath(job, new Path(output));
        
        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
	}
	

	/**
	 * check the args 
	 */
	private void checkArgs() {
		if(input1==null||"".equals(input1)){
			System.out.println("no user input...");
			printUsage();
			System.exit(-1);
		}
		if(input2==null||"".equals(input2)){
			System.out.println("no phone input...");
			printUsage();
			System.exit(-1);
		}
		if(output==null||"".equals(output)){
			System.out.println("no output...");
			printUsage();
			System.exit(-1);
		}
		if(delimiter==null||"".equals(delimiter)){
			System.out.println("no delimiter...");
			printUsage();
			System.exit(-1);
		}
	
	}

	/**
	 * configuration the args
	 * @param args
	 */
	private void configureArgs(String[] args) {
    	for(int i=0;i<args.length;i++){
    		if("-i1".equals(args[i])){
    			input1=args[++i];
    		}
    		if("-i2".equals(args[i])){
    			input2=args[++i];
    		}
    		
    		if("-o".equals(args[i])){
    			output=args[++i];
    		}
    		
    		if("-delimiter".equals(args[i])){
    			delimiter=args[++i];
    		}
    		
    	}
	}
	public static void printUsage(){
    	System.err.println("Usage:");
    	System.err.println("-i1 input \t user data path.");
    	System.err.println("-i2 input \t phone data path.");
    	System.err.println("-o output \t output data path.");
    	System.err.println("-delimiter  data delimiter , default is comma  .");
    }
}

这里指定两个mapper和一个reducer,两个mapper分别相应处理user和phone的数据,分别例如以下:

mapper1(处理user数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * username,phone
 * 
 * output:
 * <key,value>  --> <[phone],[0,username]>
 * @author fansy
 *
 */
public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
	private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		log.info("This is the begin of Multiple1Mapper");
	} 
	
	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String info= new String(value.getBytes(),"UTF-8");
		String[] values = info.split(delimiter);
		if(values.length!=2){
			return;
		}
		log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
		cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
	}
}

mapper2(处理phone数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * phone,description
 * 
 * output:
 * <key,value>  --> <[phone],[1,description]>
 * @author fansy
 *
 */
public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
	private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		log.info("This is the begin of Multiple2Mapper");
	} 
	
	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String[] values= value.toString().split(delimiter);
		if(values.length!=2){
			return;
		}
		log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
		cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
	}
}

这里的FlagStringDataType是自己定义的:

package multiple.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
	private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
  private String value;
  private int flag;
  public FlagStringDataType() {
  }

  public FlagStringDataType(int flag,String value) {
    this.value = value;
    this.flag=flag;
  }

  public String get() {
    return value;
  }

  public void set(String value) {
    this.value = value;
  }

  @Override
  public boolean equals(Object other) {
    return other != null && getClass().equals(other.getClass()) 
    		&& ((FlagStringDataType) other).get() == value
    		&&((FlagStringDataType) other).getFlag()==flag;
  }

  @Override
  public int hashCode() {
    return Ints.hashCode(flag)+value.hashCode();
  }

  @Override
  public int compareTo(FlagStringDataType other) {
	 
    if (flag >= other.flag) {
      if (flag > other.flag) {
        return 1;
      }
    } else {
      return -1;
    }
    return value.compareTo(other.value);
  }

  @Override
  public void write(DataOutput out) throws IOException {
	log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
    out.writeInt(flag);
    out.writeUTF(value);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
	  flag=in.readInt();
	  value = in.readUTF();
	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
  }

public int getFlag() {
	return flag;
}

public void setFlag(int flag) {
	this.flag = flag;
}

public String toString(){
	return flag+":"+value;
}

}

这个自己定义类,使用一个flag来指定是哪个数据。而value则相应是其值。

这样做的优点是在reduce端能够依据flag的值来推断其输出位置。这样的设计方式能够对多种输入的整合有非常大帮助,在mahout中也能够看到这样的设计。

reducer(汇总输出数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
	private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
	} 
	@Override
	public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
		log.info("================");
		log.info("         =======");
		log.info("              ==");
		String[] value= new String[3];
		value[2]=key.toString();
		for(FlagStringDataType v:values){
			int index= v.getFlag();
			log.info("index:"+index+"-->value:"+v.get());
			value[index]= v.get();
		}
		log.info("              ==");
		log.info("         =======");
		log.info("================");
		cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
	}
}

这样设计的优点是,能够针对不同的输入数据採取不同的逻辑处理。并且不同的输入数据能够是序列文件的格式。

以下介绍一种方式和上面的比。略有不足。可是能够借鉴。

首先是Driver:

package multiple.input;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *  
 * input2(/multiple/phone/phone):
 *  user_phone,description 
 *  
 * output: username,user_phone,description
 * 
 * @author fansy
 *
 */
public class MultipleDriver2 extends Configured implements Tool{
//	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);
	
	private String input1=null;
	private String input2=null;
	private String output=null;
	private String delimiter=null;
	
	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
//		conf.set("fs.defaultFS", "hdfs://node33:8020");  
//        conf.set("mapreduce.framework.name", "yarn");  
//        conf.set("yarn.resourcemanager.address", "node33:8032"); 
        
		ToolRunner.run(conf, new MultipleDriver2(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		configureArgs(arg0);
		checkArgs();
		
		Configuration conf= getConf();
		conf.set("delimiter", delimiter);
		 @SuppressWarnings("deprecation")
		Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver2.class);
        job.setMapperClass(MultipleMapper.class);
        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(input1));
        FileInputFormat.addInputPath(job, new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));
        
        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
	}
	

	/**
	 * check the args 
	 */
	private void checkArgs() {
		if(input1==null||"".equals(input1)){
			System.out.println("no user input...");
			printUsage();
			System.exit(-1);
		}
		if(input2==null||"".equals(input2)){
			System.out.println("no phone input...");
			printUsage();
			System.exit(-1);
		}
		if(output==null||"".equals(output)){
			System.out.println("no output...");
			printUsage();
			System.exit(-1);
		}
		if(delimiter==null||"".equals(delimiter)){
			System.out.println("no delimiter...");
			printUsage();
			System.exit(-1);
		}
	
	}

	/**
	 * configuration the args
	 * @param args
	 */
	private void configureArgs(String[] args) {
    	for(int i=0;i<args.length;i++){
    		if("-i1".equals(args[i])){
    			input1=args[++i];
    		}
    		if("-i2".equals(args[i])){
    			input2=args[++i];
    		}
    		
    		if("-o".equals(args[i])){
    			output=args[++i];
    		}
    		
    		if("-delimiter".equals(args[i])){
    			delimiter=args[++i];
    		}
    		
    	}
	}
	public static void printUsage(){
    	System.err.println("Usage:");
    	System.err.println("-i1 input \t user data path.");
    	System.err.println("-i2 input \t phone data path.");
    	System.err.println("-o output \t output data path.");
    	System.err.println("-delimiter  data delimiter , default is comma  .");
    }
}

这里加入路径直接使用FileInputFormat加入输入路径,这样的话,针对不同的输入数据的不同业务逻辑能够在mapper中先推断眼下正在处理的是那个数据。然后依据其路径来进行相应的业务逻辑处理:

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 * input1 :
 * username,phone
 * 
 * input2
 * phone,description
 * 
 * output:
 * <key,value>  --> <[phone],[0,username]>
 * <key,value>  --> <[phone],[1,description]>
 * @author fansy
 *
 */
public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
	
	private String delimiter=null; // default is comma
	private boolean flag=false;
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		InputSplit input=cxt.getInputSplit();  
	    String filename=((FileSplit) input).getPath().getParent().getName();
	    if("user".equals(filename)){
	    	flag=true;
	    }
	} 
	
	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String[] values= value.toString().split(delimiter);
		if(values.length!=2){
			return;
		}
		if(flag){
			cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
		}else{
			cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
		}
	}
}

整体来说。这样的处理方式事实上是不如第一种的,在每一个map函数中都须要进行推断。比第一种多了非常多操作;同一时候。针对不同的序列文件,这样的方式处理不了(Key、value的类型不一样的情况下)。

所以针对多文件格式的输入,不妨使用第一种方式。

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/108221.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 5G NR 逻辑信道、传输信道和物理信道

    5G NR 逻辑信道、传输信道和物理信道无线接口可分为三个协议层:物理层(L1)、数据链路层(L2)和网络层(L3)。L1:主要用于为高层业务提供传输的无线物理通道。L2:包括四个子层MAC(MediumAccessControl)媒体接入控制RLC(RadioLinkControl)无线链路控制PDCP(PacketDataConvergenceProtocol)分组数据汇聚协议SDAP(ServiceD…

  • unity drawcall怎么看_unity scrollview

    unity drawcall怎么看_unity scrollview在实际项目开发中,提起unity优化,肯定是有DrawCall的相关内容的,下面就讲解一下什么是DrawCall以及如何对DrawCall进行优化操作。一、什么是DrawCall?    在unity中,每次CPU准备数据并通知GPU的过程就称之为一个DrawCall。    具体过程就是:设置颜色–&gt;绘图方式–&gt;顶点坐标–&gt;绘制–&gt;结束…

  • Flask 的 jsonify 理解[通俗易懂]

    Flask 的 jsonify 理解[通俗易懂]文章目录python代码解决原因Content-Type的区别python代码#-*-coding:utf-8-*-fromflaskimportFlask,jsonifyapp=Flask(__name__)urls=[{‘id’:1,’title’:’python’,’descripti…

  • vue 父组件调用子组件的函数_vue父组件调用子组件属性

    vue 父组件调用子组件的函数_vue父组件调用子组件属性第一种方法直接在子组件中通过this.$parent.event来调用父组件的方法父组件<template><div><child></child></div></template><script>importchildfrom’./components/dam/…

  • 批处理文件for循环_批处理循环语句

    批处理文件for循环_批处理循环语句命令格式:for{%variable|%%variable}in(集合)docommand[options]%variable|%%variable:代表可替换参数。使用%variable通过命令提示符执行for命令。使用%%variable在批处理文件中执行for命令;这个变量可以是26个英文字母任意一个,也可以是其他;这些变量会区分大小写,%%x和%%X代表

    2022年10月12日
  • matlab误差条形图_excel柱状图添加标准误差线

    matlab误差条形图_excel柱状图添加标准误差线为准确快速评定线轮廓度误差,提出了一种基于分割逼近法与MATLAB相结合的用于计算平面线轮廓度误差的新方法,该方法符合最小条件原理;它根据平面线轮廓度误差的定义……细想一下,只做误差分析和数据处理好像内容过于单调。加之,要是做一个完完全全依赖于Matlab的程序,这样也不太好用。如果将这个程序放在一个独立的界面上……和模型参数和计算参数参数和计算参数误差用matla…

    2022年10月19日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号