现在的位置: 首页 > 综合 > 正文

MapReduce求平均值

2018年06月05日 ⁄ 综合 ⁄ 共 2587字 ⁄ 字号 评论关闭

一:背景

求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,Reduce端汇总并且统计记录数,然后作商即可。

二:技术实现

#需求:现有成绩单如下,求出每个同学的平均成绩

小民	语文	80
小民	数学	98
小民	英语	89
小芳	语文	88
小芳	数学	99
小芳	英语	90

实现代码:

public class AverageTest {
		// 定义输入路径
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/average_file";
		// 定义输出路径
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 创建配置信息
				Configuration conf = new Configuration();
				
				// 创建文件系统
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果输出目录存在,我们就删除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 创建任务
				Job job = new Job(conf, AverageTest.class.getName());

				//1.1	设置输入目录和设置输入数据格式化的类
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(TextInputFormat.class);

				//1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
				job.setMapperClass(AverageMapper.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(Text.class);

				//1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
				job.setPartitionerClass(HashPartitioner.class);
				job.setNumReduceTasks(1);

				//1.4	排序
				//1.5	归约
				//2.1	Shuffle把数据从Map端拷贝到Reduce端。
				//2.2	指定Reducer类和输出key和value的类型
				job.setReducerClass(AverageReducer.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(FloatWritable.class);

				//2.3	指定输出的路径和设置输出的格式化类
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作业 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	public static class AverageMapper extends Mapper<LongWritable, Text, Text, Text>{
		//设置输出的key和value
		private Text outKey = new Text();
		private Text outValue = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
		
			//获取输入的行
			String line = value.toString();
			//取出无效记录
			if (line == null || line.equals("")){
				return ;
			}
			//对数据进行切分
			String[] splits = line.split("\t");
			
			//截取姓名和成绩
			String name = splits[0];
			String score = splits[2];
			//设置输出的Key和value
			outKey.set(name);
			outValue.set(score);
			//将结果写出去
			context.write(outKey, outValue);
			
		}
		
	}
	
	public static class AverageReducer extends Reducer<Text, Text, Text, FloatWritable>{
		//定义写出去的Key和value
		private Text name = new Text();
		private FloatWritable avg = new FloatWritable();
		@Override
		protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, FloatWritable>.Context context) throws IOException, InterruptedException {
			//定义科目数量
			int courseCount = 0;
			//定义中成绩
			int sum = 0;
			//定义平均分
			float average = 0;
			
			//遍历集合求总成绩
			for (Text val : value){
				sum += Integer.parseInt(val.toString());
				courseCount ++;
			}
			
			//求平均成绩
			average = sum / courseCount;
			
			//设置写出去的名字和成绩
			name.set(key);
			avg.set(average);
			
			//把结果写出去
			context.write(name, avg);
		}
	}
}

程序运行的结果:

抱歉!评论已关闭.