现在的位置: 首页 > 综合 > 正文

Hadoop Map/Reduce 新API中自己的FileInputFormat写法

2017年10月27日 ⁄ 综合 ⁄ 共 5313字 ⁄ 字号 评论关闭

在看《Hadoop in Action》时发现代码使用的是旧的API,且部分API已经标记为Deprecated。

所以自己尝试着写了一个使用新API的例子来完成该代码的功能。

数据格式如下:

"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384

...

程序的目的是将所有数据的CITING和CITED值反过来输出。


MapReduce程序:

package com;  
  
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.CompressionCodecFactory;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
public class MyJob extends Configured implements Tool {  
      
    public static class MapClass extends Mapper<Text,Text,Text,Text> {  
          
        public void map(Text key,Text value,Context context)   
                throws IOException, InterruptedException {  
            context.write(value, key);  
        }  
    }  
      
    public static class Reduce extends Reducer<Text,Text,Text,Text> {  
          
        public void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
              
            String csv = "";  
            for(Text value : values) {  
                if( csv.length() > 0 ) csv += ",";  
                csv += value.toString();  
            }  
            context.write(key, new Text(csv));  
        }  
    }  
      
    public static void main(String[] args) throws Exception {  
        int res = ToolRunner.run(new Configuration(), new MyJob(), args);    //调用新的类的方法免除配置的相关琐碎的细节  
        System.exit(res);  
    }  
  
    @Override  
    public int run(String[] arg0) throws Exception {  
        Job job = new Job();  
        job.setJarByClass(MyJob.class);  
          
        FileInputFormat.addInputPath(job, new Path(arg0[0]));  
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));  
          
        job.setMapperClass(MapClass.class);  
        job.setReducerClass(Reduce.class);  
        job.setInputFormatClass(MyInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
          
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
          
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
        return 0;  
    }  
}  

MyInputFormat类:

import java.io.IOException;  
  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.CompressionCodecFactory;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  
import com.MyRecordReader;  
  
public class MyInputFormat extends FileInputFormat<Text,Text> {  
    @Override  
      protected boolean isSplitable(JobContext context, Path file) {  
        CompressionCodec codec =   
          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
        return codec == null;  
      }  
    @Override  
    public RecordReader<Text, Text> createRecordReader(InputSplit split,  
            TaskAttemptContext context) throws IOException,  
            InterruptedException {  
          
        return new MyRecordReader(context.getConfiguration());  
    }  
      
}  

MyRecordReaader类:(参照KeyValueTextInputFormat(hadoop-0.23.0)写成)

package com;  
  
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
  
public class MyRecordReader extends RecordReader<Text,Text> {  
    private final LineRecordReader lineRecordReader;  
    private byte separator = (byte) ',';  
      
    private Text innerValue;  
    private Text key;  
    private Text value;  
      
    public MyRecordReader(Configuration conf) {  
        lineRecordReader = new LineRecordReader();  
    }  
      
    @Override  
    public void close() throws IOException {  
        // TODO Auto-generated method stub  
        lineRecordReader.close();  
    }  
  
    @Override  
    public Text getCurrentKey() throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        return key;  
    }  
  
    @Override  
    public Text getCurrentValue() throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        return value;  
    }  
  
    @Override  
    public float getProgress() throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        return lineRecordReader.getProgress();  
    }  
  
    @Override  
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)  
            throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        lineRecordReader.initialize(genericSplit, context);  
    }  
  
    @Override  
    public boolean nextKeyValue() throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        byte[] line = null;  
        int lineLen = -1;  
          
        if( lineRecordReader.nextKeyValue() ) {  
            innerValue = lineRecordReader.getCurrentValue();  
            line = innerValue.getBytes();  
            lineLen = innerValue.getLength();  
        } else {  
            return false;  
        }  
          
        if( line == null )   
            return false;  
        if( key == null )  
            key = new Text();  
        if( value == null )  
            value = new Text();  
          
        int pos = findSeparator(line, 0, lineLen, this.separator);  
        setKeyValue(key,value,line,lineLen,pos);  
        return true;  
    }  
      
    public int findSeparator(byte[] utf, int start, int length, byte sep) {  
        for( int i = start; i < (start + length); ++ i ) {  
            if( utf[i] == sep ) {  
                return i;  
            }  
        }  
        return -1;  
    }  
      
    public void setKeyValue(Text key, Text value, byte[] line,  
                            int lineLen, int pos) {  
        if( pos == -1 ) {  
            key.set(line, 0, lineLen);  
            value.set("");  
        } else {  
            key.set(line, 0, pos);  
            value.set(line,pos+1,lineLen-pos-1);  
        }  
    }  
}  

抱歉!评论已关闭.