现在的位置: 首页 > 综合 > 正文

谈Map/Reduce对分区与分组的理解

2014年09月05日 ⁄ 综合 ⁄ 共 7757字 ⁄ 字号 评论关闭

看了一部份资料,主要是《hadoop权威指南》中提到的分区与分组

书上是怎么介绍的我就不详细介绍了,http://blog.csdn.net/dajuezhao/article/details/5803994自己可以看下.主要是通过程序理解下..

我前也有篇文章是做两文件Join工作的.data.txt和info.txt

data.txt

201001 1003 abc
201002 1005 def
201003 1006 ghi
201004 1003 jkl
201005 1004 mno
201006 1005 pqr

info.txt

1003 kaka
1004 da
1005 jue
1006 zhao

期望输出结果:

1003	201001	abc kaka
1003	201004	jkl kaka
1004	201005	mno da
1005	201002	def jue
1005	201006	pqr jue
1006	201003	ghi zhao


代码部分:(这里分享下,群主给的代码,加上个人所做的试验)

public class JionQurey extends Configured implements Tool {

	
	public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text>{

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String pathName = ((FileSplit)context.getInputSplit()).getPath().toString();
			//根据文件名判断处理
			if(pathName.contains("data.txt")){
				String[] line =value.toString().split(" ");
				if(line.length < 3){
					//data数据格式second不规范,字段小于3,抛弃数据
					return ;
				}else{
					// 数据格式规范,区分标识为1
					TextPair tp = new TextPair(new Text(line[1]), new Text("1"));
					context.write(tp, new Text(line[0]+" "+line[2]));
				}
			}
			if(pathName.contains("info.txt")){
				String[] line = value.toString().split(" ");
				if(line.length < 2){
					// data数据格式不规范,字段小于2,抛弃数据
					return ;
				}else{
					// 数据格式规范,区分标识为0
					TextPair tp = new TextPair(new Text(line[0]), new Text("0"));
					context.write(tp, new Text(line[1]));
				}
			}
		}
	}
	
	public static class Example_Join_01_Partitionner extends Partitioner<TextPair, Text>{

		@Override
		public int getPartition(TextPair key, Text value, int numParition) {
			return Math.abs(key.getFirst().hashCode() * 127) % numParition;
		}
		
	}
	
	public static class Example_Join_01_Comparator extends WritableComparator{
		
		public Example_Join_01_Comparator(){
			super(TextPair.class,true);
		}

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			// TODO Auto-generated method stub
			TextPair t1 =(TextPair) a;
			TextPair t2 = (TextPair)b;
			return t1.getFirst().compareTo(t2.getFirst());   //只要是第一个字段相同的就分成为同一组
		}
	}
//	1、map之后的输出会进行一些分区的操作,代码贴出来: 
//
//	public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
//		@Override
//		public int getPartition(TextPair key, Text value, int numParititon) {
//			return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
//		}
//	}
	public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

		@Override
		public void reduce(TextPair key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			Text pid = key.getFirst();
			//其实这里已近排序好了第一个值就是info.txt的字段,后面的都是data.txt字段
			String desc = values.iterator().next().toString();
			while(values.iterator().hasNext()){
				context.write(pid, new Text(values.iterator().next().toString()+" "+desc));
			}
		}
	}
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Configuration(), new JionQurey(), args);
		System.exit(exitCode);
	}
	
	public static class TextPair implements WritableComparable<TextPair>{
		private Text first;
		private Text second;
		
		public Text getFirst() {
			return first;
		}
		public void setFirst(Text first) {
			this.first = first;
		}
		public Text getsecond() {
			return second;
		}
		public void setsecond(Text second) {
			this.second = second;
		}
		
//		public TextPair(Text first, Text second) {
//			this.first = first;
//			this.second = second;
//		}
		public void set(Text first, Text second) {
			this.first = first;
			this.second = second;
		}
		public TextPair() {
			set(new Text(), new Text());
		}

		public TextPair(String first, String second) {
			set(new Text(first), new Text(second));
		}

		public TextPair(Text first, Text second) {
			set(first, second);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			first.readFields(in);
			second.readFields(in);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			first.write(out);
			second.write(out);
		}

		@Override
		public int compareTo(TextPair tp) {
			int cmp = first.compareTo(tp.first);
			if(cmp != 0) return cmp;
			return second.compareTo(tp.second);
		}
		
	}

	@Override
	public int run(String[] args) throws Exception {
		for(int i = 0 ;i<args.length;i++ ){
			System.out.println(args[i]);
		}
		Configuration conf = new Configuration();
		GenericOptionsParser parser = new GenericOptionsParser(conf,args);
		String[] otherArgs = parser.getRemainingArgs();
		if(args.length < 3){
			System.out.println("please enter <in path1><in Path2> <out Path>");
			System.exit(2);
		}
		FileSystem fs =FileSystem.get(URI.create(args[2]), conf);
		if(fs.exists(new Path(args[2]))){
			fs.delete(new Path(args[2]),true);
		}
		
		Job job = new Job(conf ,"JionQurey");
		// 设置运行的job
		job.setJarByClass(JionQurey.class);
		// 设置Map相关内容
		job.setMapperClass(Example_Join_01_Mapper.class);
		// 设置Map的输出
		job.setMapOutputKeyClass(TextPair.class);
		job.setMapOutputValueClass(Text.class);
		// 设置partition
		job.setPartitionerClass(Example_Join_01_Partitionner.class);
		// 在分区之后按照指定的条件分组
		job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
		// 设置reduce
		job.setReducerClass(Example_Join_01_Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 设置输入和输出的目录
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		// 执行,直到结束就退出
		return job.waitForCompletion(true) ? 0 : 1;
	}

}

实现的大致流程是:

C、Map执行完成之后,输出的中间结果如下:

1003,0	kaka
1004,0	da
1005,0	jue
1006,0	zhao
1003,1	201001	abc
1003,1	201004	jkl
1004,1	201005	mon
1005,1	201002	def
1005,1	201006	pqr
1006,1	201003	ghi
 

分区后:

同一区:
1003,0	kaka
1003,1	201001	abc
1003,1	201004	jkl

同一区:
1004,0	da
1004,1	201005	mon

同一区:
1005,0	jue
1005,1	201002	def
1005,1	201006	pqr

同一区:
1006,0	zhao
1006,1	201003	ghi

分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,

是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下:
分组后:

同一组:
1003,0	kaka
1003,0	201001	abc
1003,0	201004	jkl

同一组:
1004,0	da
1004,0	201005	mon

同一组:
1005,0	jue
1005,0	201002	def
1005,0	201006	pqr

同一组:
1006,0	zhao
1006,0	201003	ghi

看上去比我的所做的好多了,且不需要用这么多List来保存数据.

这样并不算完了,我们还没真正体验到Partitioner和WritableComparator到底实现了怎么一个功能.

于是我就先去到Partitioner,看看文件中出现的是什么..

1003	201001 abc kaka
1003	201004 jkl kaka
1004	201005 mno da
1005	201002 def jue
1005	201006 pqr jue
1006	201003 ghi zhao

咦,这不就是我们想要的结果吗?那我们为什么还要分组呢?

我是这样理解的,我们在分组的代码中是通过:

t1.getFirst().compareTo(t2.getFirst());

它把Map中的结果作为一个分区,然后直接通过WritableComparator忽略了第二个字段,分组。

然后,我又把代码恢复,去掉WritableComparator

看到的结果想象,于是我又该了下,把Reduce直接做打印操作。

public void reduce(TextPair key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			Text pid = key.getFirst();
			//其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
//			String desc = values.iterator().next().toString();
			while(values.iterator().hasNext()){
				context.write(pid, new Text(values.iterator().next().toString()+" "));
			}
		}

显示结果:

1003	kaka 
1003	201001 abc 
1003	201004 jkl 
1004	da 
1004	201005 mno 
1005	jue 
1005	201002 def 
1005	201006 pqr 
1006	zhao 
1006	201003 ghi 

肯定有人会认为,这样一个结果不就和上面的一样了么,把屏蔽的去掉就行了,其实不然,像1003是分了两组的.

继续修改下代码:

public void reduce(TextPair key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			Text pid = key.getFirst();
			//其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段
//			String desc = values.iterator().next().toString();
			while(values.iterator().hasNext()){
				context.write(pid, new Text(values.iterator().next().toString()+" "));
			}
			context.write(new Text("Group"), new Text("+++++++++++++++++++++++++++++++"));
		}

输出的结果是:

1003	kaka 
Group	+++++++++++++++++++++++++++++++
1003	201001 abc 
1003	201004 jkl 
Group	+++++++++++++++++++++++++++++++
1004	da 
Group	+++++++++++++++++++++++++++++++
1004	201005 mno 
Group	+++++++++++++++++++++++++++++++
1005	jue 
Group	+++++++++++++++++++++++++++++++
1005	201002 def 
1005	201006 pqr 
Group	+++++++++++++++++++++++++++++++
1006	zhao 
Group	+++++++++++++++++++++++++++++++
1006	201003 ghi 
Group	+++++++++++++++++++++++++++++++

可以看出他们的并同一个Reduce中输出的结果...分成了不同的组.

做了这么多,还是由些疑虑,为什么只用分组就能实现了我们想要的结果了呢?难道真是做为了一个分区来进行分组的吗?

那我们平常不重写Partitioner和WritableComparator的时候,怎么能够更具key来分开呢?还望早日有高手解答我的不明之处..

次日,同时又做了下hadoop利用Partitioner分类输出到不同的文件夹中的例子,我自是稍作修改,发现更加奇怪了,我这里只是显示一个文件夹,

               而在别人写的资料上显示的却是多个文件夹,如果通过一个Partitioner要分为多个文件夹的话,那岂不是上面的例题要分为很多个文件夹么..

于是我就带着问题去找导师,导师给我的解释是,分文件夹的是老版本的例题.Map中都是用output.collect();现在我直接继承的是Mapper,用的是新版本的

新版本不允许分为多个文件夹了....自能有一个文件夹。那WritableComparable是怎么一回事啊.直接告诉我主要功能是进行一种排序.

这些天一直不忘想这个问题,今天又得到点感悟...来解释下前面为什么会出现不要partitioner还是返回这样的结果:

首先,我们需要知道Map/Reduce的工作流程要有所清楚:Map ------> partitioner(分区) ------> comparato

来看下那个没有重写partitioner的代码.看到它的key传递的是个对象.在明确一下就是一个静态的对象.

着就表明了不管我是怎么new了个对象,他们的地址都是一样的.然后根据key来分区(默认),所以他们就会被分为同一个区.

至于接下来的分组操作就好理解了...

抱歉!评论已关闭.