看了一部份资料,主要是《hadoop权威指南》中提到的分区与分组
书上是怎么介绍的我就不详细介绍了,http://blog.csdn.net/dajuezhao/article/details/5803994自己可以看下.主要是通过程序理解下..
我前也有篇文章是做两文件Join工作的.data.txt和info.txt
data.txt
201001 1003 abc 201002 1005 def 201003 1006 ghi 201004 1003 jkl 201005 1004 mno 201006 1005 pqr
info.txt
1003 kaka 1004 da 1005 jue 1006 zhao
期望输出结果:
1003 201001 abc kaka 1003 201004 jkl kaka 1004 201005 mno da 1005 201002 def jue 1005 201006 pqr jue 1006 201003 ghi zhao
代码部分:(这里分享下,群主给的代码,加上个人所做的试验)
public class JionQurey extends Configured implements Tool { public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text>{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit)context.getInputSplit()).getPath().toString(); //根据文件名判断处理 if(pathName.contains("data.txt")){ String[] line =value.toString().split(" "); if(line.length < 3){ //data数据格式second不规范,字段小于3,抛弃数据 return ; }else{ // 数据格式规范,区分标识为1 TextPair tp = new TextPair(new Text(line[1]), new Text("1")); context.write(tp, new Text(line[0]+" "+line[2])); } } if(pathName.contains("info.txt")){ String[] line = value.toString().split(" "); if(line.length < 2){ // data数据格式不规范,字段小于2,抛弃数据 return ; }else{ // 数据格式规范,区分标识为0 TextPair tp = new TextPair(new Text(line[0]), new Text("0")); context.write(tp, new Text(line[1])); } } } } public static class Example_Join_01_Partitionner extends Partitioner<TextPair, Text>{ @Override public int getPartition(TextPair key, Text value, int numParition) { return Math.abs(key.getFirst().hashCode() * 127) % numParition; } } public static class Example_Join_01_Comparator extends WritableComparator{ public Example_Join_01_Comparator(){ super(TextPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub TextPair t1 =(TextPair) a; TextPair t2 = (TextPair)b; return t1.getFirst().compareTo(t2.getFirst()); //只要是第一个字段相同的就分成为同一组 } } // 1、map之后的输出会进行一些分区的操作,代码贴出来: // // public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> { // @Override // public int getPartition(TextPair key, Text value, int numParititon) { // return Math.abs(key.getFirst().hashCode() * 127) % numParititon; // } // } public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> { @Override public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text pid = key.getFirst(); //其实这里已近排序好了第一个值就是info.txt的字段,后面的都是data.txt字段 String desc = values.iterator().next().toString(); while(values.iterator().hasNext()){ context.write(pid, new Text(values.iterator().next().toString()+" "+desc)); } } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new JionQurey(), args); System.exit(exitCode); } public static class TextPair implements WritableComparable<TextPair>{ private Text first; private Text second; public Text getFirst() { return first; } public void setFirst(Text first) { this.first = first; } public Text getsecond() { return second; } public void setsecond(Text second) { this.second = second; } // public TextPair(Text first, Text second) { // this.first = first; // this.second = second; // } public void set(Text first, Text second) { this.first = first; this.second = second; } public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if(cmp != 0) return cmp; return second.compareTo(tp.second); } } @Override public int run(String[] args) throws Exception { for(int i = 0 ;i<args.length;i++ ){ System.out.println(args[i]); } Configuration conf = new Configuration(); GenericOptionsParser parser = new GenericOptionsParser(conf,args); String[] otherArgs = parser.getRemainingArgs(); if(args.length < 3){ System.out.println("please enter <in path1><in Path2> <out Path>"); System.exit(2); } FileSystem fs =FileSystem.get(URI.create(args[2]), conf); if(fs.exists(new Path(args[2]))){ fs.delete(new Path(args[2]),true); } Job job = new Job(conf ,"JionQurey"); // 设置运行的job job.setJarByClass(JionQurey.class); // 设置Map相关内容 job.setMapperClass(Example_Join_01_Mapper.class); // 设置Map的输出 job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(Text.class); // 设置partition job.setPartitionerClass(Example_Join_01_Partitionner.class); // 在分区之后按照指定的条件分组 job.setGroupingComparatorClass(Example_Join_01_Comparator.class); // 设置reduce job.setReducerClass(Example_Join_01_Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入和输出的目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); // 执行,直到结束就退出 return job.waitForCompletion(true) ? 0 : 1; } }
实现的大致流程是:
C、Map执行完成之后,输出的中间结果如下: 1003,0 kaka 1004,0 da 1005,0 jue 1006,0 zhao 1003,1 201001 abc 1003,1 201004 jkl 1004,1 201005 mon 1005,1 201002 def 1005,1 201006 pqr 1006,1 201003 ghi 分区后: 同一区: 1003,0 kaka 1003,1 201001 abc 1003,1 201004 jkl 同一区: 1004,0 da 1004,1 201005 mon 同一区: 1005,0 jue 1005,1 201002 def 1005,1 201006 pqr 同一区: 1006,0 zhao 1006,1 201003 ghi 分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看, 是按照复合key的第一个字段做分组原则,达到忽略复合key的第二个字段值的目的,从而让数据能够迭代在一个reduce中。输出后结果如下: 分组后: 同一组: 1003,0 kaka 1003,0 201001 abc 1003,0 201004 jkl 同一组: 1004,0 da 1004,0 201005 mon 同一组: 1005,0 jue 1005,0 201002 def 1005,0 201006 pqr 同一组: 1006,0 zhao 1006,0 201003 ghi 看上去比我的所做的好多了,且不需要用这么多List来保存数据. 这样并不算完了,我们还没真正体验到Partitioner和WritableComparator到底实现了怎么一个功能. 于是我就先去到Partitioner,看看文件中出现的是什么.. 1003 201001 abc kaka 1003 201004 jkl kaka 1004 201005 mno da 1005 201002 def jue 1005 201006 pqr jue 1006 201003 ghi zhao 咦,这不就是我们想要的结果吗?那我们为什么还要分组呢? 我是这样理解的,我们在分组的代码中是通过: t1.getFirst().compareTo(t2.getFirst()); 它把Map中的结果作为一个分区,然后直接通过WritableComparator忽略了第二个字段,分组。 然后,我又把代码恢复,去掉WritableComparator 看到的结果想象,于是我又该了下,把Reduce直接做打印操作。 public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text pid = key.getFirst(); //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段 // String desc = values.iterator().next().toString(); while(values.iterator().hasNext()){ context.write(pid, new Text(values.iterator().next().toString()+" ")); } } 显示结果: 1003 kaka 1003 201001 abc 1003 201004 jkl 1004 da 1004 201005 mno 1005 jue 1005 201002 def 1005 201006 pqr 1006 zhao 1006 201003 ghi 肯定有人会认为,这样一个结果不就和上面的一样了么,把屏蔽的去掉就行了,其实不然,像1003是分了两组的. 继续修改下代码: public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text pid = key.getFirst(); //其实这里已近设计好了第一个字段就是info.txt的字段,后面的都是data.txt字段 // String desc = values.iterator().next().toString(); while(values.iterator().hasNext()){ context.write(pid, new Text(values.iterator().next().toString()+" ")); } context.write(new Text("Group"), new Text("+++++++++++++++++++++++++++++++")); } 输出的结果是: 1003 kaka Group +++++++++++++++++++++++++++++++ 1003 201001 abc 1003 201004 jkl Group +++++++++++++++++++++++++++++++ 1004 da Group +++++++++++++++++++++++++++++++ 1004 201005 mno Group +++++++++++++++++++++++++++++++ 1005 jue Group +++++++++++++++++++++++++++++++ 1005 201002 def 1005 201006 pqr Group +++++++++++++++++++++++++++++++ 1006 zhao Group +++++++++++++++++++++++++++++++ 1006 201003 ghi Group +++++++++++++++++++++++++++++++ 可以看出他们的并同一个Reduce中输出的结果...分成了不同的组. 做了这么多,还是由些疑虑,为什么只用分组就能实现了我们想要的结果了呢?难道真是做为了一个分区来进行分组的吗? 那我们平常不重写Partitioner和WritableComparator的时候,怎么能够更具key来分开呢?还望早日有高手解答我的不明之处.. 次日,同时又做了下hadoop利用Partitioner分类输出到不同的文件夹中的例子,我自是稍作修改,发现更加奇怪了,我这里只是显示一个文件夹, 而在别人写的资料上显示的却是多个文件夹,如果通过一个Partitioner要分为多个文件夹的话,那岂不是上面的例题要分为很多个文件夹么.. 于是我就带着问题去找导师,导师给我的解释是,分文件夹的是老版本的例题.Map中都是用output.collect();现在我直接继承的是Mapper,用的是新版本的 新版本不允许分为多个文件夹了....自能有一个文件夹。那WritableComparable是怎么一回事啊.直接告诉我主要功能是进行一种排序. 这些天一直不忘想这个问题,今天又得到点感悟...来解释下前面为什么会出现不要partitioner还是返回这样的结果: 首先,我们需要知道Map/Reduce的工作流程要有所清楚:Map ------> partitioner(分区) ------> comparato 来看下那个没有重写partitioner的代码.看到它的key传递的是个对象.在明确一下就是一个静态的对象. 着就表明了不管我是怎么new了个对象,他们的地址都是一样的.然后根据key来分区(默认),所以他们就会被分为同一个区. 至于接下来的分组操作就好理解了...