实例:输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列,要求从输入数据中找出工厂名和
地址名的对应关系,输出工厂名-地址名表
数据样本
图:factory
图:address
图:输出结果
package com.ccse.hadoop.join; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * reduce端join * @author woshiccna * */ public class MultiTableJoinApp { public static final String INPUT_PATH = "hdfs://chaoren1:9000/reducejoininput"; public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/reducejoinoutput"; public static int time = 0; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MultiTableJoinApp.class.getSimpleName()); job.setJarByClass(MultiTableJoinApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.waitForCompletion(true); } /** * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割, * 连接列保存在key值,剩余列和左右表标志保存在value中,最后输出 * @author woshiccna * */ public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { if (value != null) { String line = value.toString(); int i = 0; if (line.contains("factoryname") || line.contains("addressID")) { return; } //找出数据中的分割点 while (line.charAt(i) >= '9' || line.charAt(i) <= '0') { i++; } if (line.charAt(0) >= '9' || line.charAt(0) <= '0') { //左表 int j = i - 1; while (line.charAt(j) != ' ') { j--; } context.write(new Text(line.substring(i)), new Text("1+" + line.substring(0, j))); } else { //右表 int j = i + 1; while (line.charAt(j) != ' ') { j++; } context.write(new Text(line.substring(0, i + 1)), new Text("2+" + line.substring(j))); } } } } public static class MyReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { if (time == 0) { context.write(new Text("factoryname"), new Text("addressname")); time++; } int factoryNum = 0; //工厂数量 int addressNum = 0; //地址数量 String[] factory = new String[10]; String[] address = new String[10]; if (values != null) { for (Text value : values) { if (value != null) { String line = value.toString(); if (line.startsWith("1+")) { //factory factory[factoryNum++] = line.substring(2); } else if (line.startsWith("2+")) { //address address[addressNum++] = line.substring(2); } } } for (int i = 0; i < factoryNum; i++) { for (int j = 0; j < addressNum; j++) { context.write(new Text(factory[i]), new Text(address[j])); } } } } } }