这两天在网上看了个MapReduce的多文件输出的帖子: http://blog.csdn.net/inkfish。写的不错。
我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。
My_LineRead.java
public My_LineRead(DataOutputStream out) {
this(out, colon); //调用下面的构造函数
}
public My_LineRead(DataOutputStream out, String keyValueSeparator) {
// TODO Auto-generated constructor stub
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
@Override
public void close(TaskAttemptContext arg0) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
out.close();
}
@Override
public void write(K key, V value) throws IOException,
InterruptedException {
if (!(key == null && key instanceof NullWritable)){
//如果key不为空者输出key
if ((Object)key instanceof Text){
Text to = (Text) key;
out.write(to.getBytes(), 0, to.getLength());
}
else
{
out.write(key.toString().getBytes(utf8));
}
out.write(keyValueSeparator);
}
if (!(value == null && value instanceof NullWritable)){
//如果value不为空则输出value
if ((Object)value instanceof Text){
Text to = (Text) value;
out.write(to.getBytes(), 0, to.getLength());
}
else
{
out.write(value.toString().getBytes(utf8));
}
out.write(newline);
}
}
}
MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解
//MultiRecordWriter类
public class MultiRecordWriter extends RecordWriter<K, V> {
/**RecordWriter的缓存*/
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
/**输出目录*/
private Path workPath = null;
//构造函数
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
//关闭,应该可能是多个文件进行关闭,所有采用循环
//recordWriters.values() 就是指的getBaseRecordWriter返回的值。
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
//得到输出文件名
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
//如果recordWriters里没有文件名,那么就建立。否则就直接写值。
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
//放入HashMap
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}
// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException {
//获取配置文件
Configuration conf = job.getConfiguration();
//查看是否使用解码器
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = ",";
RecordWriter<K, V> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new My_LineRead<K, V>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
}
//如果不使用解码器
else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
//recordWriter = new My_LineRead<K, V>(fileOut, keyValueSeparator);
//这里我使用的我自己的OutputFormat
recordWriter = new My_LineRead<K, V>(fileOut);
}
return recordWriter;
}
}
}
最后就是测试类,WordCount_MulFileOut.java
@Override
protected String generateFileNameForKeyValue(WritableComparable key,
Writable value, Configuration conf) {
// TODO Auto-generated method stub
return "other.txt";
}
}
public static void main(String args[])throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount_MulFileOut.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MyMultiple.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(wordcountMapper.class);
job.setReducerClass(wordcountReduce.class);
job.setCombinerClass(wordcountReduce.class);
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
}
}