现在的位置: 首页 > 综合 > 正文

Hadoop–Mapreduce计算每个人成绩的平均数,用combine函数加速

2014年10月06日 ⁄ 综合 ⁄ 共 2293字 ⁄ 字号 评论关闭
import java.io.IOException;
import java.util.StringTokenizer;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;


public class Score
{
	static class ScoreMapper extends Mapper<Object, Text, Text, Text>
	{//class
		public void map(Object key,Text value,Context context)
		throws IOException,InterruptedException{
			StringTokenizer itr=new StringTokenizer(value.toString());
			String s;
			while(itr.hasMoreTokens())
			{
				System.out.println("Map "+(s=itr.nextToken()));
				context.write(new Text(s),new Text(itr.nextToken()+",1"));
			}
		}
	}
	static class ScoreCombine extends Reducer<Text, Text, Text, Text>
	{
		public void reduce(Text key,Iterable<Text>values,Context context)
		throws IOException,InterruptedException{
			int sum=0,cnt=0;
			for(Text val:values)
			{
				String[] s1=val.toString().split(",");
				sum+=Integer.parseInt(s1[0]);
				cnt+=Integer.parseInt(s1[1]);
			}
			String s;
			System.out.println("Combine"+(s=new String(sum+","+cnt)));
			context.write(key,new Text(new String(sum+","+cnt)));
		}
	}
	static class ScoreReducer extends Reducer<Text, Text, Text, DoubleWritable>
	{
		public void reduce(Text key,Iterable<Text>values,Context context)
		throws IOException,InterruptedException{
			int sum=0,cnt=0;
			for(Text val:values)
			{
				String[]s=val.toString().split(",");
				sum+=Integer.parseInt(s[0]);
				cnt+=Integer.parseInt(s[1]);
			}
			String s;
			System.out.println("reduce"+(s=new String(key+","+(sum*1.0/cnt))));
			context.write(key,new DoubleWritable(sum*1.0/cnt));
		}
	}
	public static void main(String args[])throws Exception
	{
		Configuration conf=new Configuration();
		if(args.length!=2)
		{
			System.out.print("Usage: Score <in> <out>");
			System.exit(2);
		}
		Job job=new Job(conf,"Score");
		job.setJarByClass(Score.class);
		job.setMapperClass(ScoreMapper.class);
	 job.setCombinerClass(ScoreCombine.class);
		job.setReducerClass(ScoreReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}
}
/**输入文件格式:
aa 100
bb 34
...
 **/

/**输出aa 60.5bb 17.5cc 51.666666666666664dd 18.0hh 34.0tt 81.5 **/






抱歉!评论已关闭.