import java.io.IOException; import java.util.StringTokenizer; import java.util.Vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; public class Find2 { static class Find2Mapper extends Mapper<Object, Text, Text, Text> {//class public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ StringTokenizer itr=new StringTokenizer(value.toString()); String son,father; while(itr.hasMoreTokens()) { son=itr.nextToken(); if(son=="child"){itr.nextToken();continue;} father=itr.nextToken(); context.write(new Text(son), new Text("@"+father)); context.write(new Text(father), new Text("*"+son)); } } } // static class Find2Combine extends Reducer<Text, Text, Text, Text> // { // public void reduce(Text key,Iterable<Text>values,Context context) // throws IOException,InterruptedException{ // int sum=0,cnt=0; // for(Text val:values) // { // String[] s1=val.toString().split(","); // sum+=Integer.parseInt(s1[0]); // cnt+=Integer.parseInt(s1[1]); // } // String s; // System.out.println("Combine"+(s=new String(sum+","+cnt))); // context.write(key,new Text(new String(sum+","+cnt))); // } // } static class Find2Reducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key,Iterable<Text>values,Context context) throws IOException,InterruptedException{ Vector<String>fa,so; fa=new Vector(); so=new Vector(); String s1; for(Text tmp:values){ s1=tmp.toString(); if(s1.charAt(0)=='@')fa.add(s1.substring(1)); else so.add(s1.substring(1)); } int i,j,n=fa.size(),m=so.size(); for(i=0;i<m;++i) { for(j=0;j<n;++j) { context.write(new Text(so.get(i)), new Text(fa.get(j))); } } fa.clear(); so.clear(); } } public static void main(String args[])throws Exception { Configuration conf=new Configuration(); if(args.length!=2) { System.out.print("Usage: Find2 <in> <out>"); System.exit(2); } Job job=new Job(conf,"Find2"); job.setJarByClass(Find2.class); job.setMapperClass(Find2Mapper.class); //job.setCombinerClass(Find2Combine.class); job.setReducerClass(Find2Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } } /** 输入:
child parent
tom tomfather
tomfather tomfatherfather
...
输出:
tom tomfatherfather
... **/
//==========有两张表,一张工厂名表,内容为每一行有工厂名和地址id,另一张为工厂地址表,内容为地址id和地址表,要求连接这两张表,输出每个工厂对应的地址【思想同上】============== import java.io.IOException; import java.util.Vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FactoryAddressCombine { static class MyMaper extends Mapper<Object, Text, IntWritable, Text>{ public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ String line=value.toString(); if(line.contains("factoryname")||line.contains("addressId"))return;//首行不处理 int i=0,n=line.length(),j; char ch; ch=line.charAt(i++); if(ch>='0'&&ch<='9'){ //Address while(i<n&&line.charAt(i)!=' ')++i; j=i; while(j<n&&line.charAt(j)==' ')++j; System.out.println("Mapper1"+line.substring(0,i)+" "+line.substring(j)); context.write(new IntWritable(Integer.parseInt(line.substring(0,i))),new Text("@"+line.substring(j))); } else { //Factory for(;i<n;++i) if(line.charAt(i)>='0'&&line.charAt(i)<='9') break; j=i; for(;j>=0&&line.charAt(j)==' ';j--); System.out.println("Mapper2"+line.substring(i)+" "+line.substring(0,j)); context.write(new IntWritable(Integer.parseInt(line.substring(i))), new Text("#"+line.substring(0,j))); } } } static class MyReducer extends Reducer<IntWritable,Text, Text, Text>{ public void reduce(IntWritable key,Iterable<Text>values,Context context)throws IOException,InterruptedException{ //地名唯一 Vector<String>fa,ad; fa=new Vector(); ad=new Vector(); String tmp; for(Text val:values){ tmp=val.toString(); if(tmp.charAt(0)=='#')fa.add(tmp.substring(1)); else ad.add(tmp.substring(1)); } int i,j,n=fa.size(),m=ad.size(); for(i=0;i<n;++i) { for(j=0;j<m;++j) { System.out.println("Reducer "+fa.get(i)+" "+ad.get(j)); context.write(new Text(fa.get(i)), new Text(ad.get(j))); } } fa.clear(); ad.clear(); } } public static void main(String[]args)throws Exception{ Configuration conf=new Configuration(); Job job=new Job(conf,"factory_address"); job.setJarByClass(FactoryAddressCombine.class); job.setMapperClass(MyMaper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
/***
地址表
addressId addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
工厂表
factoryname address
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
输出
Beijing Red Star Beijing
Beijing Rising Beijing
Bank of Beijing Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
***/