现在的位置: 首页 > 综合 > 正文

Hadoop WritableComparable接口收集的知识

2014年07月27日 ⁄ 综合 ⁄ 共 3796字 ⁄ 字号 评论关闭

Writable接口大家可能都知道,它是一个实现了序列化协议的序列化对象。在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。那WritableComparable接口是可序列化并且可比较的接口。MapReduce中所有的key值类型都必须实现这个接口,既然是可序列化的那就必须得实现readFiels()和write()这两个序列化和反序列化函数,既然也是可比较的那就必须得实现compareTo()函数,该函数即是比较和排序规则的实现。这样MR中的key值就既能可序列化又是可比较的。下面几符图是API中对WritableComparable接口的解释及其方法,还有一个实现了该接口的对象的列子:

public interface WritableComparable<T>
     extends 
     Writable, 
     Comparable<T>
    

A
Writable
which is alsoComparable.

WritableComparables can be compared to each other, typically via
Comparator
s. Any type which is to be used as a key in the
Hadoop
Map-Reduce framework should implement this interface.

Example:

public class MyWritableComparable implements WritableComparable

 { // Some data private int counter; private long timestamp;

public void write(DataOutput out) throws IOException {

out.writeInt(counter);

out.writeLong(timestamp); }

public void readFields(DataInput in) throws IOException {

counter = in.readInt(); timestamp = in.readLong(); }

public int compareTo(MyWritableComparable w) {

int thisValue = this.value; int thatValue = ((IntWritable)o).value;

return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));

}

 

Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
}

让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:

IntWritable writable = new IntWritable();
writable.set(163);

类似地,我们也可以使用构造函数:

IntWritable writable = new IntWritable(163);

为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节:

public static byte[] serialize(Writable writable) throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataOutputStream dataOut = new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    return out.toByteArray();
}

整数用四个字节写入(我们使用JUnit 4断言):

byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));

字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:

public static byte[] deserialize(Writable writable, byte[] bytes)
throws IOException {
    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
    DataInputStream dataIn = new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
}

我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163:

IntWritable newWritable = new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));

WritableComparable 和comparators

IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。

package org.apache.hadoop.io;
public interface WritableComparable extends Writable, Comparable {
}

类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展:

package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator extends Comparator {
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。

WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法。其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:

RawComparator comparator = WritableComparator.get(IntWritable.class);

comparator可以用来比较两个IntWritable:

IntWritable w1 = new IntWritable(163);
IntWritable w2 = new IntWritable(67);
assertThat(comparator.compare(w1, w2), greaterThan(0));

或者它们的序列化描述:

byte[] b1 = serialize(w1);
byte[] b2 = serialize(w2);
assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));

 

 

 

【上篇】
【下篇】

抱歉!评论已关闭.