Hadoop序列化中的Writable接口(附部分源码)

Hadoop序列化中的Writable接口(附部分源码)

序列化是将结构化对象为字节流以便与通过网络进行传输或者写入持久存储。反序列化指的是将字节流转为一系列结构化对象的过程。

序化在分布式数据处理的两大领域经常出现:进程间通信和永久存储

hadoop中,节点直接的进程间通信是用远程过程调用(RPC)实现的。RPC协议将消息序列化成二进制流后发送到运城节点,远程节点接着将二进制流反序列化为原始的消息。

在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

packageorg.apache.hadoop.io; importjava.io.DataOutput; importjava.io.DataInput; importjava.io.IOException; public interface Writable { void write(DataOutput out)throws IOException; void readFields(DataInput in)throws IOException;

write和readFields分别实现了把对象序列化和反序列化的功能

让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:

IntWritable writable =new IntWritable(); writable.set(163);

类似地,我们也可以使用构造函数:

IntWritable writable =newIntWritable(163);

为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节:

public static byte[] serialize(Writable writable)throws IOException { ByteArrayOutputStream out =new ByteArrayOutputStream(); DataOutputStream dataOut =new DataOutputStream(out); writable.write(dataOut); dataOut.close(); returnout.toByteArray(); }

整数用四个字节写入(我们使用JUnit 4断言):

byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));

字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:

public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in =new ByteArrayInputStream(bytes); DataInputStream dataIn =new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }

我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163:

IntWritable newWritable =new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));

WritableComparable 和comparator

IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。

packageorg.apache.hadoop.io; public interface WritableComparable<t> extends Writable, Comparable<t> { }

类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展

packageorg.apache.hadoop.io; importjava.util.Comparator; public interface RawComparator<t> extends Comparator<t> { public int compare(byte[] b1,ints1,intl1,byte[] b2,ints2,intl2); }
 package java.util; public interface Comparator<T> {     int compare(T o1, T o2);     boolean equals(Object obj); }

这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:

RawComparator<intwritable> comparator = WritableComparator.get(IntWritable.class);

WritableComparator get方法源码:

private static HashMap<Class, WritableComparator> comparators = new HashMap<Class, WritableComparator>(); // registry /** Get a comparator for a {@link WritableComparable} implementation. */ public static synchronized WritableComparator get(Class<? extends WritableComparable> c) { WritableComparator comparator = comparators.get(c); if (comparator == null) comparator = new WritableComparator(c, true); return comparator; }

comparator可以用来比较两个IntWritable:

IntWritable w1 =newIntWritable(163); IntWritable w2 =newIntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它们的序列化描述: 

byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

WritableComparator的compare()方法的源码:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them } @SuppressWarnings("unchecked") public int compare(WritableComparable a, WritableComparable b) { return a.compareTo(b); }

参考:《hadoop权威指南》

转载于:https://my.oschina.net/winHerson/blog/130145

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/110135.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 单片机应用基础知识_51单片机基础知识总结

    单片机应用基础知识_51单片机基础知识总结单片机——硬件基础知识宗旨:技术的学习是有限的,分享的精神是无限的。1、单片机内部资源STC89C52:8KFLASH、512字节RAM、32个IO口、3个定时器、1个UART、8个中断源(1)Flash(硬盘)——程序存储空间——擦写10万次,断电数据不丢失,读写速度慢(2)RAM(内存)——数据存储空间——断电数据丢失

  • MySQL删除表提示Cannot truncate a table referenced in a foreign key constraint解决办法

    MySQL删除表提示Cannot truncate a table referenced in a foreign key constraint解决办法背景因为测试过程中,几套环境都是用的同一个库,数据有点冲突,需要删库。执行truncatetablexxx时提示:[Err]1701-Cannottruncateatablereferencedinaforeignkeyconstraint….解决办法删除之前先执行删除外键约束SETforeign_key_checks=0删除完之后再执行启动外

  • 《天下强汉》3、强汉骄子,双星闪耀——卫青霍去病的华美人生[通俗易懂]

    《天下强汉》3、强汉骄子,双星闪耀——卫青霍去病的华美人生[通俗易懂]【档案】  姓名:卫青,字仲卿;霍去病,号嫖姚  生卒:卫青,约公元前157年—公元前106年;霍去病,公元前141年—公元前117年  性别:男  籍贯:河东平阳人(今山西临汾市西南)  家庭出身:私生子、奴隶(卫青);外戚(霍去病)  学历:无师自通的军事天才  相貌:疏眉朗目,高大俊美,英武矫健  秘密武器:武刚车  经典战役:龙城之战,雁门之战,河南之战,突袭右贤王之战,漠南之战,河西之战…

  • 目标检測的图像特征提取之(一)HOG特征

    目标检測的图像特征提取之(一)HOG特征

    2021年11月29日
  • 虚拟机扩容磁盘后扩容分区_如何将磁盘主分区设置为活动分区

    虚拟机扩容磁盘后扩容分区_如何将磁盘主分区设置为活动分区当服务器数据太多的时候,硬盘不足的时候就得考虑扩容,为了不影响业务的正常运行,一般云服务器的本地磁盘都是不支持分区的,因为业务数据通常是不能中断和移动的,无论是增加硬盘或在原有磁盘增加分区的方式扩容,势必会存在卸载、挂载、移动等操作。所以对于云硬盘,如果我们要把他作为数据盘。即使能分区,也最好不要分区,以免以后扩容麻烦。以下用虚拟机看一下对未分区磁盘扩容的效果[root@k8s-node01…

  • 编译原理 实验3 递归下降语法分析程序设计

    编译原理 实验3 递归下降语法分析程序设计实验目的】练习构造递归下降语法分析程序的方法,熟悉上下文无关文法的使用,加深对课堂教学的理解;提高语法分析方法的实践能力【实验要求】利用某一高级程序设计语言构造语法分析程序【具体要求】对于给定的文法G[E]E-&gt;TE’E’-&gt;+TE’|εT-&gt;F…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号