Hadoop序列化中的Writable接口(附部分源码)

Hadoop序列化中的Writable接口(附部分源码)

序列化是将结构化对象为字节流以便与通过网络进行传输或者写入持久存储。反序列化指的是将字节流转为一系列结构化对象的过程。

序化在分布式数据处理的两大领域经常出现:进程间通信和永久存储

hadoop中,节点直接的进程间通信是用远程过程调用(RPC)实现的。RPC协议将消息序列化成二进制流后发送到运城节点,远程节点接着将二进制流反序列化为原始的消息。

在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

packageorg.apache.hadoop.io; importjava.io.DataOutput; importjava.io.DataInput; importjava.io.IOException; public interface Writable { void write(DataOutput out)throws IOException; void readFields(DataInput in)throws IOException;

write和readFields分别实现了把对象序列化和反序列化的功能

让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:

IntWritable writable =new IntWritable(); writable.set(163);

类似地,我们也可以使用构造函数:

IntWritable writable =newIntWritable(163);

为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节:

public static byte[] serialize(Writable writable)throws IOException { ByteArrayOutputStream out =new ByteArrayOutputStream(); DataOutputStream dataOut =new DataOutputStream(out); writable.write(dataOut); dataOut.close(); returnout.toByteArray(); }

整数用四个字节写入(我们使用JUnit 4断言):

byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));

字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:

public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in =new ByteArrayInputStream(bytes); DataInputStream dataIn =new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163:

IntWritable newWritable =new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));

WritableComparable 和comparator

IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。

packageorg.apache.hadoop.io; public interface WritableComparable<t> extends Writable, Comparable<t> { }

类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展

packageorg.apache.hadoop.io; importjava.util.Comparator; public interface RawComparator<t> extends Comparator<t> { public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2); }
 package java.util; public interface Comparator<T> {     int compare(T o1, T o2);     boolean equals(Object obj); }

这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:

RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);

WritableComparator get方法源码:

private static HashMap<Class, WritableComparator> comparators = new HashMap<Class, WritableComparator>(); // registry /** Get a comparator for a {@link WritableComparable} implementation. */ public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }

comparator可以用来比较两个IntWritable:

IntWritable w1 =newIntWritable(163); IntWritable w2 =newIntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它们的序列化描述: 

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

WritableComparator的compare()方法的源码:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them } @SuppressWarnings("unchecked") public int compare(WritableComparable a, WritableComparable b) { return a.compareTo(b); }

参考:《hadoop权威指南》

转载于:https://my.oschina.net/winHerson/blog/130145

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/110135.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 这样规范写代码,同事直呼“666”

    点击上方“全栈程序员社区”,星标公众号 重磅干货,第一时间送达 来源:cnblogs.com/taojietaoge/p/11575376.html 一、MyBatis 不要为了多…

  • 最短路径Dijkstra算法原理及Matlab实现「建议收藏」

    最短路径Dijkstra算法原理及Matlab实现「建议收藏」图论的基础知识不再阐述。最短路径算法主要有二Dijkstra算法Floyd算法Dijkstra算法研究的是从初始点到其他每一结点的最短路径而Floyd算法研究的是任意两结点之间的最短路径以下图为例,首先介绍Dijstra的原理红字为各结点的编号,蓝字为各结点之间的距离首先定义几个变量结点个数n;二维矩阵M(nxn),距离矩阵,连通的结点间即为距离,不…

  • 如果我说熟悉SpringBoot 面试官会怎么问?

    如果我说熟悉SpringBoot 面试官会怎么问?SpringBoot因简化了Spring框架使用难度,极大地提高了Java企业级应用开发的效率,成为企业考核人才的重要标准之一。但随着现今互联网行业快速发展、企业业务不断深入,相应地对SpringBoot技术要求也愈来愈高。春节时期有一位打算金三银四面试的读者私信问我:如果我说熟悉SpringBoot面试官会怎么问?​可能不少朋友跟他一样,不清楚当下企业真实生产环境下对SpringBoot有哪些具体要求,需要掌握到什么程度。为此,结合这些年的面试经历及各大厂的职位要求,给

  • linux系统下codeblocks控制台打印中文乱码[通俗易懂]

    linux系统下codeblocks控制台打印中文乱码[通俗易懂]linux系统下codeblocks控制台打印中文乱码在linux下安装codeblocks后,打印中文出现如下问题:#include<stdio.h>#include<stdlib.h>intmain(){printf(“你好,世界!\n”);return0;}解决办法1、将Settings-&…

  • 最有效的最新防360拦截方法大全![通俗易懂]

    最有效的最新防360拦截方法大全![通俗易懂]首先声明,现在对于360拦截,没有任何一种方法是绝对有效的。因为存在举报,同一网站举报次数达到5次以上就会有360公司的员工接入人工审核,人工接入的话,再好的技术都是百搭,所以我这里的技术可以说绝对拦得住“机器审核”,具体“拦截时间未知”(因为不知道您的竞争对手什么时候会给您搞点小动作)我总结的方法一共有5种,都做过测试,分析出优缺点。具体如下:方法一:用框架调用主页,也就

    2022年10月23日
  • pycharm安装pyinstaller[通俗易懂]

    pycharm安装pyinstaller[通俗易懂]我是刚刚学习python的小白,我第一次安装也出错,pip下载经常出错,这个是因为网络问题,下载出错的找个网络好一点的地方就可以了,在网上找了各种教程,也下载了别人提供的安装包,结果发现,始终不行,后来我才发现,原来是版本问题,我的安装包是pyinstaller3.4,python版本是3.8,后来我在pycharm里面下载了pyinstaller,步骤如下pycharm安装pyinstaller首先打开pycharm中的setting在这里插入图片描述然后按图打开projectinterpret

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号