现在的位置: 首页 > 综合 > 正文

Hadoop中的GenericWritable

2018年06月05日 ⁄ 综合 ⁄ 共 3843字 ⁄ 字号 评论关闭

一:背景

某些业务的数据来源可能不同,且数据源中的分割方式也不同,导致在MapReduce编程时使用的格式化类会不同,为了包装不同的Map输出,Hadoop提供了GenericWritable类,允许我们同时操作多个不同的Map输出,输出到一个Reduce中进行处理。

技术实现:

我们对HDFS中两个不同的数据源进行处理,数据源如下(hello文件中的内容是通过制表符来分割的,hello2中的内容是通过逗号来分割的):

为了同时处理这两个文件,我们要使用不同的Map进行处理。源码如下:

public class MyGenericWritableTest {

	//定义输入路径
	private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/files";
	//定义输出路径
	private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
	
	public static void main(String[] args) {
		
		try {
			//创建配置信息
			Configuration conf = new Configuration();
			//创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			//如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))){
				fileSystem.delete(new Path(OUT_PATH), true);
			}
			
			//创建任务
			Job job = new Job(conf, MyGenericWritableTest.class.getName());
			
			//1.1 设置输入目录和设置格式化的类(这是使用GenericWritable的关键)
			MultipleInputs.addInputPath(job, new Path(INPUT_PATH + "/hello"), TextInputFormat.class, MyGenericMapper1.class);
			MultipleInputs.addInputPath(job, new Path(INPUT_PATH + "/hello2"), TextInputFormat.class, MyGenericMapper2.class);
			
			//1.2 设置自定义Mapper类和map函数输出数据的key和value的类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(MyGenericWritable.class);
			
			//1.3 设置分区和reduce的数量(reduce的数量和分区的数量对应,因为分区的数量为一个所以reduce的数量也为一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);
			
			//1.4 排序 分组
			//1.5 归约
			//2.1 Shuffle过程将map端输出的数据拷贝到reduce端
			
			//2.2 设置自定义的Reducer类和输出的key和value类型
			job.setReducerClass(MyGenericReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(LongWritable.class);
			
			//2.3设置输出的格式化类和指定输出路径
			job.setOutputFormatClass(TextOutputFormat.class);
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			
			//提交作业
			System.exit(job.waitForCompletion(true) ? 0 : 1);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	
		
	}
	
	/**
	 * 自定义Mapper类用于处理hello1文件,该文件中的内容是以制表符进行分割的
	 * @author 廖钟民
	 * time : 2015年1月15日下午5:12:18
	 * @version
	 */
	public static class MyGenericMapper1 extends Mapper<LongWritable, Text, Text, MyGenericWritable>{
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MyGenericWritable>.Context context) throws IOException,
				InterruptedException {
				//对value按制表符进行切分
				String[] splits = value.toString().split("\t");
				//遍历字符串数组通过context写出去
				for (String str : splits){
					context.write(new Text(str), new MyGenericWritable(new LongWritable(1)));
				}
		}
	}
	
	/**
	 * 自定义Mapper类用于处理hello2文件,该文件的输入格式化类是TextInputFormat,用逗号给开
	 * @author 廖钟民
	 * time : 2015年1月15日下午5:13:30
	 * @version
	 */
	public static class MyGenericMapper2 extends Mapper<LongWritable, Text, Text, MyGenericWritable>{
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MyGenericWritable>.Context context) throws IOException,
				InterruptedException {
			//对value进行切割
			String[] splits = value.toString().split(",");
			
			//遍历字符串数组通过context写出去
			for (String str : splits){
				//为了模拟不同的map端输出,我们这里故意设置一个Text类型
				context.write(new Text(str), new MyGenericWritable(new Text("1")));
			}
			
		}
	}
	
	/**
	 * 自定义Reducer类,对不同Map端的输出进行汇总
	 * @author 廖钟民
	 * time : 2015年1月15日下午7:09:27
	 * @version
	 */
	public static class MyGenericReducer extends Reducer<Text, MyGenericWritable, Text, LongWritable>{
		@Override
		protected void reduce(Text key, Iterable<MyGenericWritable> values, Reducer<Text, MyGenericWritable, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			//定义单词出现的总次数
			long count = 0L;
			//遍历集合进行叠加
			for (MyGenericWritable time : values){
				//获取MyGenericWritable对象
				Writable writable = time.get();
				//如果当前是LongWritable类型
				if (writable instanceof LongWritable){
					
					count += ((LongWritable) writable).get();
				}
				//如果当前是Text类型
				if (writable instanceof Text){
					count += Long.parseLong(((Text)writable).toString());
				}
			}
			//把最后计算所得的结果写出去
			context.write(key, new LongWritable(count));
		}
	}
}

/**
 * 继承GenericWritable进行自定义
 * @author 廖钟民
 * time : 2015年1月15日下午4:29:50
 * @version
 */
class MyGenericWritable extends GenericWritable{

	//无参构造函数
	public MyGenericWritable() {
		
	}
	
	//有参构造函数
	public MyGenericWritable(Text text) {
		super.set(text);
	}
	
	//有参构造函数
	public MyGenericWritable(LongWritable longWritable) {
		super.set(longWritable);
	}

	
	@Override
	protected Class<? extends Writable>[] getTypes() {
		
		return new Class[]{LongWritable.class,Text.class};
	}
	
}

输出结果:

抱歉!评论已关闭.