现在的位置: 首页 > 综合 > 正文

Map/reduce 输出格式化

2013年12月03日 ⁄ 综合 ⁄ 共 3061字 ⁄ 字号 评论关闭

在运行mapTask 或者reduceTask,输出的结果可能需要进行格式化才能满足我们的需求.

hadoop 提供了OutputFormat 供我们转换使用。org.apache.hadoop.mapreduce.lib.output.OutputFormat<KV>

//在Job中可以通过setOutputFormatClass 方法来设置格式化,SortedOutputFormat.class就是我们要编写的格式化类。

job.setOutputFormatClass(SortedOutputFormat.class);

我们自己定义的格式化类必须是继承OutputFormat这个类 具体看类图

OutputFormat是一个抽象类其中主要抽象方法:

一:checkOutPutSpecs:验证输出路径是否存在

二:GetOutputCommitter:获取一个OutPutCommitter对象主要负责:

1在job初始化的时候生成一些配置信息,临时输出文件夹等

2.在job完成的时候处理一些工作

3.配置task 临时文件

4.调度task任务的提交

5.提交task输出文件

6.取消task的文件的提交

三:getRecordWriter:这个方法返回的RecordWriter将告诉我们数据怎么输出到输出文件里

我们在编写输出格式化扩展类主要就是实现这个方法。

接着在看看org.apache.hadoop.mapred.RecordWriter<K,V>这个接口

close(Reporter reporter):关闭操作
write(K key,V value):实现如何写key/value

其实重点就是构建一个类来实现这个接口。

在org.apache.hadoop.mapreduce.RecordWriter<K,V>有这么个同名的类

可以直接new RecordWriter()

我们在写扩展类的时候只要扩展这个类,重写相关方法就可以了。

 /** 摘自 TextOutputFormat}中的LineRecordWriter。 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {

    private static final String utf8 = "UTF-8";

    private static final byte[] newline;
    static {
        try {
            newline = "\n".getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }

    protected DataOutputStream out;

    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
        this.out = out;
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }

    public LineRecordWriter(DataOutputStream out) {
        this(out, "\t");
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }

    @Override
    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(newline);
    }

    public synchronized void write(Integer num) throws IOException {
        if (num ==null) {
            writeObject(null);
        }else{
            writeObject(num);
        }
    }

    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.close();
    }
}

 hadoop 提供的几种格式化方式

• KeyValueTextInputFormat: Key 是第一个tab键前的值,value是剩下的值,如果剩下没值则value就是空(经常用这个)
• TextInputFormant: key 是行号、value 是行内容 

• NLineInputFormat: Similar to KeyValueTextInputFormat, but the splits are based on N
lines of input rather than Y bytes of input.
• MultiFileInputFormat: An abstract class that lets the user implement an input format
that aggregates multiple files into one split.写成多个文件
• SequenceFIleInputFormat: The input file is a Hadoop sequence file, containing serialized
key/value pairs.

抱歉!评论已关闭.