在看《Hadoop in Action》时发现代码使用的是旧的API,且部分API已经标记为Deprecated。
所以自己尝试着写了一个使用新API的例子来完成该代码的功能。
数据格式如下:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
...
程序的目的是将所有数据的CITING和CITED值反过来输出。
MapReduce程序:
package com; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool { public static class MapClass extends Mapper<Text,Text,Text,Text> { public void map(Text key,Text value,Context context) throws IOException, InterruptedException { context.write(value, key); } } public static class Reduce extends Reducer<Text,Text,Text,Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String csv = ""; for(Text value : values) { if( csv.length() > 0 ) csv += ","; csv += value.toString(); } context.write(key, new Text(csv)); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MyJob(), args); //调用新的类的方法免除配置的相关琐碎的细节 System.exit(res); } @Override public int run(String[] arg0) throws Exception { Job job = new Job(); job.setJarByClass(MyJob.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(MyInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } }
MyInputFormat类:
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import com.MyRecordReader; public class MyInputFormat extends FileInputFormat<Text,Text> { @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); return codec == null; } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(context.getConfiguration()); } }
MyRecordReaader类:(参照KeyValueTextInputFormat(hadoop-0.23.0)写成)
package com; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class MyRecordReader extends RecordReader<Text,Text> { private final LineRecordReader lineRecordReader; private byte separator = (byte) ','; private Text innerValue; private Text key; private Text value; public MyRecordReader(Configuration conf) { lineRecordReader = new LineRecordReader(); } @Override public void close() throws IOException { // TODO Auto-generated method stub lineRecordReader.close(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineRecordReader.getProgress(); } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub lineRecordReader.initialize(genericSplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub byte[] line = null; int lineLen = -1; if( lineRecordReader.nextKeyValue() ) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if( line == null ) return false; if( key == null ) key = new Text(); if( value == null ) value = new Text(); int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key,value,line,lineLen,pos); return true; } public int findSeparator(byte[] utf, int start, int length, byte sep) { for( int i = start; i < (start + length); ++ i ) { if( utf[i] == sep ) { return i; } } return -1; } public void setKeyValue(Text key, Text value, byte[] line, int lineLen, int pos) { if( pos == -1 ) { key.set(line, 0, lineLen); value.set(""); } else { key.set(line, 0, pos); value.set(line,pos+1,lineLen-pos-1); } } }