现在的位置: 首页 > 综合 > 正文

Hadoop–MapReduce【表内容的关联】查找孙子和祖父,合并工厂地址表

2014年10月06日 ⁄ 综合 ⁄ 共 5299字 ⁄ 字号 评论关闭
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;


public class Find2
{
	static class Find2Mapper extends Mapper<Object, Text, Text, Text>
	{//class
		public void map(Object key,Text value,Context context)
		throws IOException,InterruptedException{
			StringTokenizer itr=new StringTokenizer(value.toString());
			String son,father;
			while(itr.hasMoreTokens())
			{
				son=itr.nextToken();
				if(son=="child"){itr.nextToken();continue;}
				father=itr.nextToken();
				context.write(new Text(son), new Text("@"+father));
				context.write(new Text(father), new Text("*"+son));
			}
		}
	}
//	static class Find2Combine extends Reducer<Text, Text, Text, Text>
//	{
//		public void reduce(Text key,Iterable<Text>values,Context context)
//		throws IOException,InterruptedException{
//			int sum=0,cnt=0;
//			for(Text val:values)
//			{
//				String[] s1=val.toString().split(",");
//				sum+=Integer.parseInt(s1[0]);
//				cnt+=Integer.parseInt(s1[1]);
//			}
//			String s;
//			System.out.println("Combine"+(s=new String(sum+","+cnt)));
//			context.write(key,new Text(new String(sum+","+cnt)));
//		}
//	}
	static class Find2Reducer extends Reducer<Text, Text, Text, Text>
	{
		public void reduce(Text key,Iterable<Text>values,Context context)
		throws IOException,InterruptedException{
			
			Vector<String>fa,so;
			fa=new Vector();
			so=new Vector();
			String s1;
			for(Text tmp:values){
				s1=tmp.toString();
				if(s1.charAt(0)=='@')fa.add(s1.substring(1));
				else so.add(s1.substring(1));
			}
			int i,j,n=fa.size(),m=so.size();
			for(i=0;i<m;++i)
			{
				for(j=0;j<n;++j)
				{
					context.write(new Text(so.get(i)), new Text(fa.get(j)));
				}
			}
			fa.clear();
			so.clear();
		}
	}
	public static void main(String args[])throws Exception
	{
		Configuration conf=new Configuration();
		if(args.length!=2)
		{
			System.out.print("Usage: Find2 <in> <out>");
			System.exit(2);
		}
		Job job=new Job(conf,"Find2");
		job.setJarByClass(Find2.class);
		job.setMapperClass(Find2Mapper.class);
	 //job.setCombinerClass(Find2Combine.class);
		job.setReducerClass(Find2Reducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}
}
/**
输入:
child parent
tom tomfather
tomfather tomfatherfather
...
输出:
tom tomfatherfather
...
 **/
//==========有两张表,一张工厂名表,内容为每一行有工厂名和地址id,另一张为工厂地址表,内容为地址id和地址表,要求连接这两张表,输出每个工厂对应的地址【思想同上】==============
import java.io.IOException;
import java.util.Vector;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FactoryAddressCombine {
	static class MyMaper extends Mapper<Object, Text, IntWritable, Text>{
		public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
			String line=value.toString();
			
			if(line.contains("factoryname")||line.contains("addressId"))return;//首行不处理
			int i=0,n=line.length(),j;
			char ch;
			ch=line.charAt(i++);
			if(ch>='0'&&ch<='9'){
				//Address
				while(i<n&&line.charAt(i)!=' ')++i;
				j=i;
				while(j<n&&line.charAt(j)==' ')++j;
				System.out.println("Mapper1"+line.substring(0,i)+" "+line.substring(j));
				context.write(new IntWritable(Integer.parseInt(line.substring(0,i))),new Text("@"+line.substring(j)));
			}
			else
			{
				//Factory
				for(;i<n;++i)
					if(line.charAt(i)>='0'&&line.charAt(i)<='9')
						break;
				j=i;
				for(;j>=0&&line.charAt(j)==' ';j--);
				
				System.out.println("Mapper2"+line.substring(i)+" "+line.substring(0,j));
				context.write(new IntWritable(Integer.parseInt(line.substring(i))), new Text("#"+line.substring(0,j)));
			}
		}
	}
	static class MyReducer extends Reducer<IntWritable,Text, Text, Text>{
		public void reduce(IntWritable key,Iterable<Text>values,Context context)throws IOException,InterruptedException{
			//地名唯一
			Vector<String>fa,ad;
			fa=new Vector();
			ad=new Vector();
			String tmp;
			for(Text val:values){
				tmp=val.toString();
				if(tmp.charAt(0)=='#')fa.add(tmp.substring(1));
				else ad.add(tmp.substring(1));
			}
			int i,j,n=fa.size(),m=ad.size();
			for(i=0;i<n;++i)
			{
				for(j=0;j<m;++j)
				{
					System.out.println("Reducer "+fa.get(i)+" "+ad.get(j));
					context.write(new Text(fa.get(i)), new Text(ad.get(j)));
				}
			}
			fa.clear();
			ad.clear();
		}
	}
	public  static void main(String[]args)throws Exception{
		Configuration conf=new Configuration();
		Job job=new Job(conf,"factory_address");
		job.setJarByClass(FactoryAddressCombine.class);
		job.setMapperClass(MyMaper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}
}

/***

地址表

addressId addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian

工厂表

factoryname address
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1

输出

Beijing Red Star Beijing
Beijing Rising Beijing
Bank of Beijing Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen

***/

抱歉!评论已关闭.