现在的位置: 首页 > 综合 > 正文

MapReduce求最大值

2018年06月05日 ⁄ 综合 ⁄ 共 2996字 ⁄ 字号 评论关闭

一:背景

求最值是MapReduce的常见算法,应用也很广泛,比如说求出某大型销售网站各个站点销售量最大的商品,人口最多的城市等等,MapReduce求最大值的关键是要实现cleanUp()方法。

二:技术实现

#需求 有两个文件max和max2,现要求合并两个并找出最大值。

#max文件数据如下:

10
29
50
39
88
99
29
100

389

#max2文件数据如下:

10
20
39
90
33
299

99
390
900
999
22

实现代码如下:

public class MaxTest {
	// 定义输入路径
	private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/max_file/*";
	// 定义输出路径
	private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

	public static void main(String[] args) {

		try {
			// 创建配置信息
			Configuration conf = new Configuration();

			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}

			// 创建任务
			Job job = new Job(conf, MaxTest.class.getName());

			//1.1 设置输入目录和设置输入数据格式化的类
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			job.setInputFormatClass(TextInputFormat.class);

			//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(MaxMapper.class);
			job.setMapOutputKeyClass(LongWritable.class);
			job.setMapOutputValueClass(NullWritable.class);

			//1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);

			//1.4 排序
			//1.5 归约
			//2.1 Shuffle把数据从Map端拷贝到Reduce端。
			//2.2 指定Reducer类和输出key和value的类型
			job.setReducerClass(MaxReducer.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(NullWritable.class);

			//2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);

			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static class MaxMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
		// 定义一个Long类型的最小值作为临时变量
		private Long max = Long.MIN_VALUE;
		// 定义输出去的value
		private LongWritable maxValue = new LongWritable();

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws IOException,
				InterruptedException {
			// 获取输入的行
			String line = value.toString();
			// 抛弃无效记录
			if (line == null || line.equals("")) {
				return;
			}
			// 把line转换为数值
			long temp = Long.parseLong(line);

			// 比较大小
			if (temp > max) {
				// 把val赋值给tempMax
				max = temp;
			}

		}

		/**
		 * cleanUp()是指map函数执行完成之后就会调用,刚好满足我们的要求 因为map()函数执行完成之后我们单个任务的的最大值也就产生了
		 */
		@Override
		protected void cleanup(Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws IOException, InterruptedException {
			// 把最后的处理结果写出去
			maxValue.set(max);
			context.write(maxValue, NullWritable.get());
		}
	}

	/**
	 * 汇总多个任务产生的最大值,再次比较
	 */
	public static class MaxReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
		// 定义一个参考的临时变量
		private Long max = Long.MIN_VALUE;
		// 定义输出的key
		private LongWritable maxValue = new LongWritable();

		protected void reduce(LongWritable key, Iterable<NullWritable> value, Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
				throws IOException, InterruptedException {
			if (key.get() > max) {
				max = key.get();
			}

		}

		/**
		 * reduce任务完成后写出去
		 */
		protected void cleanup(Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws IOException, InterruptedException {
			// 设置最大值
			maxValue.set(max);
			context.write(maxValue, NullWritable.get());
		}
	}
}

程序运行结果:

【上篇】
【下篇】

抱歉!评论已关闭.