现在的位置: 首页 > 综合 > 正文

reduce端join操作

2018年05月20日 ⁄ 综合 ⁄ 共 3138字 ⁄ 字号 评论关闭

实例:输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列,要求从输入数据中找出工厂名和

地址名的对应关系,输出工厂名-地址名表

数据样本

  图:factory

   图:address

  图:输出结果

package com.ccse.hadoop.join;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * reduce端join
 * @author woshiccna
 *
 */
public class MultiTableJoinApp {

	public static final String INPUT_PATH = "hdfs://chaoren1:9000/reducejoininput";
	public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/reducejoinoutput";
	public static int time = 0;
	
	public static void main(String[] args) throws IOException, URISyntaxException,
	    ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		fileSystem.delete(new Path(OUTPUT_PATH), true);
		
		Job job = new Job(conf, MultiTableJoinApp.class.getSimpleName());
		job.setJarByClass(MultiTableJoinApp.class);
		
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.waitForCompletion(true);
	}
	
	/**
	 * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
	 * 连接列保存在key值,剩余列和左右表标志保存在value中,最后输出
	 * @author woshiccna
	 *
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			if (value != null) {
				String line = value.toString();
				int i = 0; 
				if (line.contains("factoryname") || line.contains("addressID")) {
					return;
				}
				//找出数据中的分割点
				while (line.charAt(i) >= '9' || line.charAt(i) <= '0') {
					i++;
				}
				if (line.charAt(0) >= '9' || line.charAt(0) <= '0') {
					//左表
					int j = i - 1;
					while (line.charAt(j) != ' ') {
						j--;
					}
					context.write(new Text(line.substring(i)), new Text("1+" + line.substring(0, j)));
				} else {
					//右表
					int j = i + 1;
					while (line.charAt(j) != ' ') {
						j++;
					}
					context.write(new Text(line.substring(0, i + 1)), new Text("2+" + line.substring(j)));
				}
			}
		}
	}
	
	public static class MyReducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			if (time == 0) {
				context.write(new Text("factoryname"), new Text("addressname"));
				time++;
			}
			int factoryNum = 0;  //工厂数量
			int addressNum = 0;  //地址数量
			String[] factory = new String[10];
			String[] address = new String[10];
			if (values != null) {
				for (Text value : values) {
					if (value != null) {
						String line = value.toString();
						if (line.startsWith("1+")) {   //factory
							factory[factoryNum++] = line.substring(2);
						} else if (line.startsWith("2+")) {  //address
							address[addressNum++] = line.substring(2);
						}
					}
				}
				for (int i = 0; i < factoryNum; i++) {
					for (int j = 0; j < addressNum; j++) {
						context.write(new Text(factory[i]), new Text(address[j]));
					}
				}
			}
		}
	}
}
【上篇】
【下篇】

抱歉!评论已关闭.