一:背景
某些业务的数据来源可能不同,且数据源中的分割方式也不同,导致在MapReduce编程时使用的格式化类会不同,为了包装不同的Map输出,Hadoop提供了GenericWritable类,允许我们同时操作多个不同的Map输出,输出到一个Reduce中进行处理。
技术实现:
我们对HDFS中两个不同的数据源进行处理,数据源如下(hello文件中的内容是通过制表符来分割的,hello2中的内容是通过逗号来分割的):
为了同时处理这两个文件,我们要使用不同的Map进行处理。源码如下:
public class MyGenericWritableTest { //定义输入路径 private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/files"; //定义输出路径 private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out"; public static void main(String[] args) { try { //创建配置信息 Configuration conf = new Configuration(); //创建文件系统 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); //如果输出目录存在,我们就删除 if (fileSystem.exists(new Path(OUT_PATH))){ fileSystem.delete(new Path(OUT_PATH), true); } //创建任务 Job job = new Job(conf, MyGenericWritableTest.class.getName()); //1.1 设置输入目录和设置格式化的类(这是使用GenericWritable的关键) MultipleInputs.addInputPath(job, new Path(INPUT_PATH + "/hello"), TextInputFormat.class, MyGenericMapper1.class); MultipleInputs.addInputPath(job, new Path(INPUT_PATH + "/hello2"), TextInputFormat.class, MyGenericMapper2.class); //1.2 设置自定义Mapper类和map函数输出数据的key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MyGenericWritable.class); //1.3 设置分区和reduce的数量(reduce的数量和分区的数量对应,因为分区的数量为一个所以reduce的数量也为一个) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //1.4 排序 分组 //1.5 归约 //2.1 Shuffle过程将map端输出的数据拷贝到reduce端 //2.2 设置自定义的Reducer类和输出的key和value类型 job.setReducerClass(MyGenericReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3设置输出的格式化类和指定输出路径 job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //提交作业 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } /** * 自定义Mapper类用于处理hello1文件,该文件中的内容是以制表符进行分割的 * @author 廖钟民 * time : 2015年1月15日下午5:12:18 * @version */ public static class MyGenericMapper1 extends Mapper<LongWritable, Text, Text, MyGenericWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MyGenericWritable>.Context context) throws IOException, InterruptedException { //对value按制表符进行切分 String[] splits = value.toString().split("\t"); //遍历字符串数组通过context写出去 for (String str : splits){ context.write(new Text(str), new MyGenericWritable(new LongWritable(1))); } } } /** * 自定义Mapper类用于处理hello2文件,该文件的输入格式化类是TextInputFormat,用逗号给开 * @author 廖钟民 * time : 2015年1月15日下午5:13:30 * @version */ public static class MyGenericMapper2 extends Mapper<LongWritable, Text, Text, MyGenericWritable>{ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MyGenericWritable>.Context context) throws IOException, InterruptedException { //对value进行切割 String[] splits = value.toString().split(","); //遍历字符串数组通过context写出去 for (String str : splits){ //为了模拟不同的map端输出,我们这里故意设置一个Text类型 context.write(new Text(str), new MyGenericWritable(new Text("1"))); } } } /** * 自定义Reducer类,对不同Map端的输出进行汇总 * @author 廖钟民 * time : 2015年1月15日下午7:09:27 * @version */ public static class MyGenericReducer extends Reducer<Text, MyGenericWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<MyGenericWritable> values, Reducer<Text, MyGenericWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //定义单词出现的总次数 long count = 0L; //遍历集合进行叠加 for (MyGenericWritable time : values){ //获取MyGenericWritable对象 Writable writable = time.get(); //如果当前是LongWritable类型 if (writable instanceof LongWritable){ count += ((LongWritable) writable).get(); } //如果当前是Text类型 if (writable instanceof Text){ count += Long.parseLong(((Text)writable).toString()); } } //把最后计算所得的结果写出去 context.write(key, new LongWritable(count)); } } } /** * 继承GenericWritable进行自定义 * @author 廖钟民 * time : 2015年1月15日下午4:29:50 * @version */ class MyGenericWritable extends GenericWritable{ //无参构造函数 public MyGenericWritable() { } //有参构造函数 public MyGenericWritable(Text text) { super.set(text); } //有参构造函数 public MyGenericWritable(LongWritable longWritable) { super.set(longWritable); } @Override protected Class<? extends Writable>[] getTypes() { return new Class[]{LongWritable.class,Text.class}; } }
输出结果: