MapReduce 编程不可怕,一篇文章搞定它

MapReduce 编程不可怕,一篇文章搞定它前言本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系正文需求:WordCount,大数据领域的HelloWorld。Mapperpackagecom.shockang.study.bigdata.mapreduce;importjava.io.IOException;importorg.apache.hadoop.io.IntWr

大家好,又见面了,我是你们的朋友全栈君。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

需求: Word Count,大数据领域的 Hello World。

Mapper

package com.shockang.study.bigdata.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 
   
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException { 
   
        String[] words = value.toString().split(" ");
        for (String word : words) { 
   
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

Reducer

package com.shockang.study.bigdata.mapreduce;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
   
    /* key: hello value: List(1, 1, ...) */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException { 
   
        int sum = 0;

        for (IntWritable count : values) { 
   
            sum = sum + count.get();
        }
        context.write(key, new IntWritable(sum));
    };
}

Main

package com.shockang.study.bigdata.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountMain { 

public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException { 

if (args.length != 2) { 

System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
// 打jar包
job.setJarByClass(WordCountMain.class);
// 通过job设置输入/输出格式,默认的就是 TextInputFormat/TextOutputFormat
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行
//如果不一样,需要分别设置map, reduce的输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交作业
job.waitForCompletion(true);
}
}

Combiner

package com.shockang.study.bigdata.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountMainWithCombiner { 

public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException { 

if (args.length != 2) { 

System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, WordCountMainWithCombiner.class.getSimpleName());
// 打jar包
job.setJarByClass(WordCountMainWithCombiner.class);
// 通过job设置输入/输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行
// 如果不一样,需要分别设置map, reduce的输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出key/value的类型m
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交作业
job.waitForCompletion(true);
}
}

二次排序

package com.shockang.study.bigdata.mapreduce;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/** * 现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序 */
public class Person implements WritableComparable<Person> { 

private String name;
private int age;
private int salary;
public Person() { 

}
public Person(String name, int age, int salary) { 

this.name = name;
this.age = age;
this.salary = salary;
}
public String getName() { 

return name;
}
public void setName(String name) { 

this.name = name;
}
public int getAge() { 

return age;
}
public void setAge(int age) { 

this.age = age;
}
public int getSalary() { 

return salary;
}
public void setSalary(int salary) { 

this.salary = salary;
}
@Override
public String toString() { 

return this.salary + " " + this.age + " " + this.name;
}
public int compareTo(Person o) { 

//先比较salary,高的排序在前;若相同,age小的在前
int compareResult1 = this.salary - o.salary;
if (compareResult1 != 0) { 

return -compareResult1;
} else { 

return this.age - o.age;
}
}
public void write(DataOutput dataOutput) throws IOException { 

//序列化,将NewKey转化成使用流传输的二进制
dataOutput.writeUTF(name);
dataOutput.writeInt(age);
dataOutput.writeInt(salary);
}
public void readFields(DataInput dataInput) throws IOException { 

//使用in读字段的顺序,要与write方法中写的顺序保持一致
this.name = dataInput.readUTF();
this.age = dataInput.readInt();
this.salary = dataInput.readInt();
}
}

自定义分区

package com.shockang.study.bigdata.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class CustomPartitioner extends Partitioner<Text, IntWritable> { 

public static HashMap<String, Integer> dict = new HashMap<>();
static { 

dict.put("Dear", 0);
dict.put("Bear", 1);
dict.put("River", 2);
dict.put("Car", 3);
}
public int getPartition(Text text, IntWritable intWritable, int i) { 

return dict.get(text.toString());
}
}

数据倾斜处理

当遇到数据倾斜的时候,我们可以在 Reducer 中日志记录哪些超过阈值的 key

package com.shockang.study.bigdata.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class WordCountReducerWithDataSkew extends Reducer<Text, IntWritable, Text, IntWritable> { 

public static final String MAX_VALUES = "skew.maxvalues";
private int maxValueThreshold;
@Override
protected void setup(Context context) throws IOException, InterruptedException { 

Configuration conf = context.getConfiguration();
maxValueThreshold = Integer.parseInt(conf.get(MAX_VALUES));
}
/* key: hello value: List(1, 1, ...) */
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException { 

int i = 0;
for (IntWritable value : values) { 

System.out.println(value);
i++;
}
if (++i > maxValueThreshold) { 

System.out.println("Received " + i + " values for key " + key);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/147546.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 7.PyCharm基本使用与常规设置

    7.PyCharm基本使用与常规设置文章目录0.新建Python项目0.1步骤0.2演示1.主题设置1.1步骤1.2演示2.字体大小调整2.1步骤2.2演示3.添加多个解释器3.1步骤3.2演示3.3版本切换0.新建Python项目0.1步骤第一次安装,需要创建一个项目。如果能进入到开发界面请略过。1.NewProject2.选择路径3.选择本地环境–>选择电脑安装的解释器4.取消生成main.py脚本5.create6.进入到开发界面–>close关闭推荐提示0.2演示1.主题设置1

  • SecondCopy 2000 简单说明

    SecondCopy 2000 简单说明

  • T10接口_服务端接口和前端接口

    T10接口_服务端接口和前端接口本文适用鼎捷软件T100系列1.azzi700注册接口程序号,接口服务名2.设计器code进行签出,下载(空框架)3.设计数据接收的结构,以及开发函数进行数据处理4.程序上传,无提示则表示成功5.打开http://erp_ip/wstopprd/ws/r/awsp920,如果接口地址返回isok则接口是通过的,还可以使用工具postman或者soapui进行调用测试6.检查日志,T100的接口日志存放于$TEMPDIR或者$TEMPLOG,日志的命名规则是按天的,每天调用的所

    2022年10月20日
  • 函数的极限定义

    函数的极限定义函数的极限情况情况1:自变量x任意地接近于有限值x0,记作x->x0时,函数f(x)的变化情况;情况2:自变量x的绝对值|x|无限取向正无穷的时,函数f(x)的变化情况;然后明白下去心邻域:以x0这一点为中心的任何开区间——称为点x0的邻域。用符号表达为:U(x0)如果去掉x0这个点,那么就是去心邻域,用符号表达为:U’(x0)定义:|f(x)-A|<smallvalue,x无限趋向于x0这里的:smallvalue可以任意小,要多小有多小。A是一个常数。那么此时必

  • 一小时人生怎么选服务器(服务器一般多少瓦)

    上周末扑克之星的服务器发生了罕见的崩溃,几千名玩家掉线被洗盲注,而少数几个国家并没有影响,此事让扑克之星的用户纷纷谴责他们的服务提供商。祝贺罗马尼亚、捷克、保加利亚的玩家,你们从这次服务器崩溃事件中获利了。—DramaticDegen(@TJDarroch)2018年8月12日在你中止比赛之前,你按照被洗盲一个小时的筹码量分配奖金,你真的不能那么做。我们需要知道比赛中其他玩家获得的奖金。—…

  • 用js来实现那些数据结构01(数组篇01-数组的增删)

    在开始正式的内容之前,不得不说说js中的数据类型和数据结构,以及一些比较容易让人混淆的概念。那么为什么要从数组说起?数组在js中是最常见的内存数据结构,数组数据结构在js中拥有很多的方法,很多初学者记

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号