现在的位置: 首页 > 综合 > 正文

hadoop文件输出控制,多路径输出到不同文件

2013年02月21日 ⁄ 综合 ⁄ 共 9046字 ⁄ 字号 评论关闭

hadoop的map和reduce的输出路径是通过两个函数设定:

FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

然后,用context类型对象,通过write(key,value)完成输出。

现在我需要实现将key和value写到一个文件后,有一部分输出,需要输出到指定的另一个文件中。将输出写到另一个文件,就不能用context了。

查了一下hadoop实现输出到多个文件(指定输出到多个路径)方式有两种MultipleOutputFormat和MultipleOutputs。

MultipleOutputFormat我同事有介绍,http://blog.csdn.net/shuhuai007/article/details/8968289,老代码,我没用。我找到了一篇关于两者的比较,嫌烦可以不看,

http://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html

1,旧API中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs

MultipleOutputFormat allowing to write the output data to different output files.

MultipleOutputs creates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector.

2,新API中 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs

整合了上面旧API两个的功能,没有了MultipleOutputFormat。

  The MultipleOutputs class simplifies writing output data to multiple outputs

  Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own             OutputFormat, with its own key class and with its own value class.

  Case two: to write data to different files provided by user

下面这段话来自Hadoop:The.Definitive.Guide(3rd,Early.Release)P251

  “In the old MapReduce API there are two classes for producing multiple outputs: MultipleOutputFormat and MultipleOutputs. In a nutshell, MultipleOutputs is more fully featured, but MultipleOutputFormat has more control over the output directory structure
and file naming. MultipleOutputs in the new API combines the best features of the two multiple output classes in the old API.”

这就也就是说新的API中,已经将两者合二为一了,合并为MultipleOutputs类。新api是在mapreduce包中。

 

我使用的MultipleOutputs,介绍如下

The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its ownOutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

其写函数为write,有三种函数重载

1.write(String namedOutput,Text key,IntWritable value) throwsIOException,InterruptedException

Write key and value to the namedOutput. Output path is a unique file generated for the namedOutput. For example, {namedOutput}-(m|r)-{part-number}

Parameters:

namedOutput the named output name

key the key

value the value

2.write(Text key,IntWritable value,String baseOutputPath) throwsIOException,InterruptedException

Write key value to an output file name. Gets the record writer from job's output format. Job's output format should be a FileOutputFormat.

Parameters:

key the key
value the value
baseOutputPath base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath

3..write(String namedOutput,Text key,Object value,String
baseOutputPath) throwsIOException,InterruptedException

Write key and value to baseOutputPath using the namedOutput.

Parameters:
namedOutput the named output name
key the key
value the value
baseOutputPath base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath

 

这三种,不同参数实现不同功能。我喜欢第二种write(Text key,IntWritable value,String
baseOutputPath),第三个参数是直接指定的输出路径。在代码中直接实例化对象,然后直接写到指定对象中。

比如在reduce中(map也可以)直接加入如下代码

MultipleOutputs<Text, IntWritable> mo = new MultipleOutputs<Text, IntWritable>(context);
            mo.write(key, result, key.toString());//test.jar is success
            //mo.write(key, result, "all");//unsuccess
            //mo.write(key.toString(), key, result);//unsuccess
            //mo.write("error", key, result);//unsuccess
            //mo.write("/user/ttemp", key, result);//unsuccess
            mo.close();

输出为默认的context输出路径,文件名为key.toString()。你可以直接指定路径如mo.write(key, result, "/user/you/out/"+key.toString());

但容易出错,因为baseOutputPath指定的路径不能相同,当已经存在了该文件,则会报错,说file exits)。比如,//mo.write(key, result, "all");//unsuccess,就是错误的。all文件生成一次后,就会报错。每次写你都要用不同的文件名,推荐//Random random = new Random();      //long test = random.nextInt(100); 来生成文件名。

这种写法不能追加到一个文件中,每次都有新文件,我用时间戳System.currentTimeMillis(),把云给写的差点挂掉。

所以,可以将所有内容写入一个文件。

将所有指定的输出内容写到一个文件中,用第一种,给出一个非常好的例子http://my.oschina.net/wangjiankui/blog/49521

注意在map或是reduce过程中都可以使用,但其使用方法特殊,

1)要定义成类成员private MultipleOutputs<Text, Text> mo;(map类或是reduce类都行)

2)要重写两个类方法,初始化和关闭MultipleOutputs对象。

 @Override
    protected void setup(Context context) throws IOException,
    InterruptedException {
        mo = new MultipleOutputs<Text,Text>(context);
        super.setup(context);
    }
 @Override
    protected void cleanup(Context context) throws IOException,InterruptedException{
        mo.close();
        super.cleanup(context);
    }
3)然后在对应的map函数或是reduce函数,添加写功能语句

mo.write("filename",error, content);

注意,filename是文件名,也就是具体指定的输出文件。但现在hadoop还不知道该文件(不承认未经注册的文件,必须先在主函数中注册),所以不能写入。先要在主函数中用MultipleOutputs.addNamedOutput()将对应的filename文件注册一下,告诉hadoop可以写入的合法文件都有哪些。

如下

    job.setInputFormatClass(LzoTianqiSawLogProtobufB64LineInputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
     job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ProtobufTianqiSawLogWritable.class);
    FileInputFormat.setInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    MultipleOutputs.addNamedOutput(job,"filename", TextOutputFormat.class, Text.class, Text.class);
    这最后一句,就是注册写入文件的,名字为filename。前面是hadoop运行配置。运行后,在context指定的输出路径下找到filename-0000 类似的结果文件。指定的输出内容全部写入filename一个文件中。而输出路径为context指定的输出路径。

当然,注册的合法输出文件可以有多个,比如:

MultipleOutputs.addNamedOutput(job,"filename1", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job,"filenam2", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job,"filename3", TextOutputFormat.class, Text.class, Text.class);

代码中写哪个文件名,就会写入到哪个文件。如mo.write("filename3",error, content);就会写到文件filename3中。

 

如果既要将输出内容写到一个文件中,同时又想自己指定一个输出路径,就用第三种。不需要将讲了。对么?

下面,以wordcount为例给个源码,

public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable>
    {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
   
   
    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
       
       
        private MultipleOutputs<Text, IntWritable> mo;
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
           
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
           
            //MapFile.Writer to=new MapFile.Writer
           
            context.write(key, result);//写到了一个文件里面
            String name =context.getConfiguration().get("mapred.output.dir");

            //mo= new MultipleOutputs<Text, IntWritable>(context);//context和MultipleOutputs是独立的,都进行了写功能,互不干扰
            //MultipleOutputs的第二个write写到多个文件,不能覆盖
            Text kw= new Text("this a test!sum is:"+name+":");
            IntWritable content= new IntWritable(sum);
           
            mo.write("test",kw, content);
           
            //mo.write(key, result, "error"+key.toString());//success
            //mo.write(key, result, "all");//testall.jar 有问题,因为all-r-00000生成一次后,不能覆盖
            //mo.write(key, result, null);//wrong!no file to write
            //mo.write(key, result, "/user/test");//unsuccess
            //mo.write(null, key, result, key.toString());
            mo.close();
        }
        @Override
        protected void setup(Context context) throws IOException,
        InterruptedException {
            mo = new MultipleOutputs<Text, IntWritable>(context);
            super.setup(context);
        }
        @Override
        protected void cleanup(Context context) throws IOException,InterruptedException{
            mo.close();
            super.cleanup(context);
        }
    }
   
    public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
   
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if (otherArgs.length != 2) {
     System.err.println("Usage: wordcount <in> <out>");
     System.exit(2);
     }
    
    
     Job job = new Job(conf, "word count");
     job.setJarByClass(reduce_output.class);
     job.setMapperClass(TokenizerMapper.class);
     //job.setCombinerClass(IntSumReducer.class);
     job.setReducerClass(IntSumReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
//     job.setOutputFormatClass(testOutputFormat.class)
    
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     MultipleOutputs.addNamedOutput(job, "test", TextOutputFormat.class, Text.class, IntWritable.class);//看这里

     MultipleOutputs.addNamedOutput(job, "test2", TextOutputFormat.class, Text.class, IntWritable.class);//可以多写几个,用哪个,哪个写

MultipleOutputs.addNamedOutput(job, "tes3t", TextOutputFormat.class, Text.class, IntWritable.class);//
     System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
   

 

 

抱歉!评论已关闭.