现在的位置: 首页 > 综合 > 正文

MapReduce实现等值连接,左外连接,右外连接,全外连接

2018年06月05日 ⁄ 综合 ⁄ 共 5884字 ⁄ 字号 评论关闭

#测试数据:

# more user.txt(用户ID,用户名)

1	lavimer
2	liaozhongmin
3	liaozemin

#more post.txt(用户ID,帖子ID,标题)

1	1	java
1	2	c
2	3	hadoop
4	4	hive
5	5	hbase
5	6	pig
5	7	flume

#等值连接结果如下:

1	lavimer	1	1	java
1	lavimer	1	2	c
2	liaozhongmin	2	3	hadoop

#左外连接结果如下:

1	lavimer	1	1	java
1	lavimer	1	2	c
2	liaozhongmin	2	3	hadoop
3	liaozemin	NULL


#右外连接结果如下:

1	lavimer	1	1	java
1	lavimer	1	2	c
2	liaozhongmin	2	3	hadoop
NULL	4	4	hive
NULL	5	5	hbase
NULL	5	6	pig
NULL	5	7	flume

#全外连接结果如下:

1	lavimer	1	1	java
1	lavimer	1	2	c
2	liaozhongmin	2	3	hadoop
3	liaozemin	NULL
NULL	4	4	hive
NULL	5	5	hbase
NULL	5	6	pig
NULL	5	7	flume

实现代码如下:

/**
 * 
 * @author 廖钟民
 * time : 2015年1月30日下午1:23:36
 * @version
 */
public class UserPostJoin {
	// 定义输入路径
	private static final String INPUT_PATH1 = "hdfs://liaozhongmin:9000/user_post_join/user.txt";
	private static final String INPUT_PATH2 = "hdfs://liaozhongmin:9000/user_post_join/post.txt";
	// 定义输出路径
	private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

	public static void main(String[] args) {

		try {
			// 创建配置信息
			Configuration conf = new Configuration();

			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}

			// 创建任务
			Job job = new Job(conf, UserPostJoin.class.getName());

			// 设置连接类型
			job.getConfiguration().set("joinType", "allOuter");
			// 设置多路径输入
			MultipleInputs.addInputPath(job, new Path(INPUT_PATH1), TextInputFormat.class, UserMapper.class);
			MultipleInputs.addInputPath(job, new Path(INPUT_PATH2), TextInputFormat.class, PostMapper.class);

			//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(UserPost.class);

			//1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);

			//1.4 排序
			//1.5 归约
			//2.1 Shuffle把数据从Map端拷贝到Reduce端。
			//2.2 指定Reducer类和输出key和value的类型
			job.setReducerClass(UserPostReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);

			//2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);

			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 自定义Mapper类用于处理来自user.txt文件的数据
	 * @author 廖钟民
	 * time : 2015年1月30日下午1:22:12
	 * @version
	 */
	public static class UserMapper extends Mapper<LongWritable, Text, Text, UserPost> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
			// 对字符串进行切分
			String[] arr = value.toString().split("\t");
			// 创建UserId
			Text userId = new Text(arr[0]);
			// 把结果写出去
			context.write(userId, new UserPost("U", value.toString()));
		}
	}
	/**
	 * 自定义Mapper类用于处理来自post.txt文件的数据
	 * @author 廖钟民
	 * time : 2015年1月30日下午1:22:16
	 * @version
	 */
	public static class PostMapper extends Mapper<LongWritable, Text, Text, UserPost> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
			// 对数据进行切分
			String[] arr = value.toString().split("\t");
			// 创建用户ID
			Text userId = new Text(arr[0]);
			context.write(userId, new UserPost("P", value.toString()));

		}
	}
	/**
	 * 自定义Reducer类用于处理不同Mapper类的输出
	 * @author 廖钟民
	 * time : 2015年1月30日下午1:23:05
	 * @version
	 */
	public static class UserPostReducer extends Reducer<Text, UserPost, Text, Text> {
		// 定义List集合用于存放用户
		private List<Text> users = new ArrayList<Text>();
		private List<Text> posts = new ArrayList<Text>();

		// 定义连接类型
		private String joinType;

		@Override
		protected void setup(Reducer<Text, UserPost, Text, Text>.Context context) throws IOException, InterruptedException {

			this.joinType = context.getConfiguration().get("joinType");

			System.out.println(joinType);

		}
		/**
		 * 经过Shuffle后数据会分组,每一组数据都会调用一次reduce()函数
		 *第一组数据:
		 *1	lavimer
		 *1	1	java
		 *1	2	c
		 *
		 *第二组数据:
		 *2	3	hadoop
		 *2	liaozhongmin
		 *
		 *第三组数据:
		 *3	liaozemin
		 *
		 *第四组数据:
		 *4	4	hive
		 *
		 *第五组数据:
		 *5	5	hbase
		 *5	6	pig
		 *5	7	flume
		 *
		 *每一组数据都会调用一次reduce()函数,我们以第一组数据为例进行讲解:
		 *
		 *进入reduce函数后,<1,lavimer>会被添加到users集合中 
		 *<1	1	java>和<1	2	c>会被添加到posts集合中
		 *
		 *然后是判断当前操作是什么类型的连接,我们以等值连接为例:
		 *遍历两个集合得到的数据为:
		 *【1	lavimer	   1         1    java】
		 *【1	lavimer	   1		 2    c】
		 *
		 *这是第一组数据的执行轨迹,其他依次类推就可以得到相关的操作
		 */
		protected void reduce(Text key, Iterable<UserPost> values, Reducer<Text, UserPost, Text, Text>.Context context) throws IOException,
				InterruptedException {
			// 清空集合
			users.clear();
			posts.clear();

			// 迭代values集合把当前穿进来的某个组中的数据分别添加到对应的集合中
			for (UserPost val : values) {
				System.out.println("实际值:" + key + "===>" + values.toString());
				if (val.getType().equals("U")) {
					users.add(new Text(val.getData()));
				} else {
					posts.add(new Text(val.getData()));
				}
			}

			// 根据joinType关键字做对应的连接操作
			if (joinType.equals("innerJoin")) {// 内连接
				if (users.size() > 0 && posts.size() > 0) {

					for (Text user : users) {
						for (Text post : posts) {
							context.write(new Text(user), new Text(post));
						}
					}
				}
			} else if (joinType.equals("leftOuter")) {// 左外连接

				for (Text user : users) {
					if (posts.size() > 0) {
						for (Text post : posts) {
							context.write(new Text(user), new Text(post));
						}
					} else {
						context.write(new Text(user), createEmptyPost());
					}
				}

			} else if (joinType.equals("rightOuter")) {// 右外连接
				for (Text post : posts) {
					if (users.size() > 0) {
						for (Text user : users) {
							context.write(new Text(user), new Text(post));
						}
					} else {
						context.write(createEmptyUser(), post);
					}
				}
			} else if (joinType.equals("allOuter")) {// 全外连接
				if (users.size() > 0) {
					for (Text user : users) {
						if (posts.size() > 0) {
							for (Text post : posts) {
								context.write(new Text(user), new Text(post));
							}
						} else {
							context.write(new Text(user), createEmptyPost());
						}
					}
				} else {
					for (Text post : posts) {
						if (users.size() > 0) {
							for (Text user : users) {
								context.write(new Text(user), new Text(post));
							}
						} else {
							context.write(createEmptyUser(), post);
						}
					}
				}
			}

		}

		/**
		 * 用户为空时用制表符代替
		 * 
		 * @return
		 */
		private Text createEmptyUser() {
			return new Text("NULL");
		}

		/**
		 * 帖子为空时用制表符代替
		 * 
		 * @return
		 */
		private Text createEmptyPost() {
			return new Text("NULL");
		}
	}
}
/**
 * 自定义实体类封装两个表的数据
 * @author 廖钟民
 * time : 2015年1月30日下午1:23:50
 * @version
 */
class UserPost implements Writable {

	// 类型(U表示用户,P表示帖子)
	private String type;
	private String data;

	public UserPost() {
	}

	public UserPost(String type, String data) {
		this.type = type;
		this.data = data;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public String getData() {
		return data;
	}

	public void setData(String data) {
		this.data = data;
	}

	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.type);
		out.writeUTF(this.data);

	}

	public void readFields(DataInput in) throws IOException {
		this.type = in.readUTF();
		this.data = in.readUTF();
	}

}

抱歉!评论已关闭.