现在的位置: 首页 > 综合 > 正文

MapReduce排序之 二次排序

2018年06月05日 ⁄ 综合 ⁄ 共 4908字 ⁄ 字号 评论关闭

一:背景

Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序。自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序。

二:技术实现

我们先来看案例需求

#需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开)

3	3
3	2
3	1
2	2
2	1
1	1

MapReduce计算之后的结果应该是:

1	1
2	1
2	2
3	1
3	2
3	3

#需求2:第一列不相等时,第一列按降序排列,当第一列相等时,第二列按升序排列

3	3
3	2
3	1
2	2
2	1
1	1

MapReduce计算之后的结果应该是:

3	1
3	2
3	3
2	1
2	2
1	1

下面是实现代码,实现两种需求的关键是compareTo()方法的实现不同:

public class SecondSortTest {

	// 定义输入路径
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
		// 定义输出路径
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 创建配置信息
				Configuration conf = new Configuration();
				
				/**********************************************/
				//对Map端输出进行压缩
				//conf.setBoolean("mapred.compress.map.output", true);
				//设置map端输出使用的压缩类
				//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				//对reduce端输出进行压缩
				//conf.setBoolean("mapred.output.compress", true);
				//设置reduce端输出使用的压缩类
				//conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				// 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
				/*
				 * conf.addResource("classpath://hadoop/core-site.xml"); 
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 */

				// 创建文件系统
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果输出目录存在,我们就删除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 创建任务
				Job job = new Job(conf, SecondSortTest.class.getName());

				//1.1	设置输入目录和设置输入数据格式化的类
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(TextInputFormat.class);

				//1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
				job.setMapperClass(MySecondSortMapper.class);
				job.setMapOutputKeyClass(CombineKey.class);
				job.setMapOutputValueClass(LongWritable.class);

				//1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
				job.setPartitionerClass(HashPartitioner.class);
				job.setNumReduceTasks(1);

				//1.4	排序、分组
				//1.5	归约
				//2.1	Shuffle把数据从Map端拷贝到Reduce端。
				//2.2	指定Reducer类和输出key和value的类型
				job.setReducerClass(MySecondSortReducer.class);
				job.setOutputKeyClass(LongWritable.class);
				job.setOutputValueClass(LongWritable.class);

				//2.3	指定输出的路径和设置输出的格式化类
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作业 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	
	public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{
		
		//定义联合的key
		private CombineKey combineKey = new CombineKey();
		
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
				InterruptedException {
			//对输入的value进行切分
			String[] splits = value.toString().split("\t");
			//设置联合的key
			combineKey.setComKey(Long.parseLong(splits[0]));
			combineKey.setComVal(Long.parseLong(splits[1]));
			
			//通过context写出去
			context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
		}
	}
	
	
	public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{
		@Override
		protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)
				throws IOException, InterruptedException {
			//因为输入的CombineKey已经排好序了,所有我们只要获取其中的两个成员变量写出去就可以了。values在这个例子中没有什么作用
			context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));
		}
	}

}

/**
 * 重新组合成一个key,实现二次排序
 * @author 廖*民
 * time : 2015年1月18日下午7:27:52
 * @version
 */
class CombineKey implements WritableComparable<CombineKey>{

	public long comKey;
	public long comVal;
	
	//必须提供无参构造函数,否则hadoop反射机制会出错
	public CombineKey() {
		
	}
	//有参构造函数
	public CombineKey(long comKey, long comVal) {
		this.comKey = comKey;
		this.comVal = comVal;
	}

	
	
	public long getComKey() {
		return comKey;
	}
	public void setComKey(long comKey) {
		this.comKey = comKey;
	}
	public long getComVal() {
		return comVal;
	}
	public void setComVal(long comVal) {
		this.comVal = comVal;
	}
	
	public void write(DataOutput out) throws IOException {
		out.writeLong(comKey);
		out.writeLong(comVal);
	}

	public void readFields(DataInput in) throws IOException {
		this.comKey = in.readLong();
		this.comVal = in.readLong();
	}

	/**
	 * 这个方法一定要实现
	 * java里面排序默认是小的放在前面,即返回负数的放在前面,这样就是所谓的升序排列
	 * 我们在下面的方法中直接返回一个差值,也就相当于会升序排列。
	 * 如果我们要实现降序排列,那么我们就可以返回一个正数
	 */
	/*public int compareTo(CombineKey o) {
		//第一列不相同时按升序排列,当第一列相同时第二列按升序排列
		long minus = this.comKey - o.comKey;
		//如果第一个值不相等时,我们就先对第一列进行排序
		if (minus != 0){
			return (int) minus;
		}
		//如果第一列相等时,我们就对第二列进行排序
		return (int) (this.comVal - o.comVal);
	}*/
	
	/**
	 * 为了实现第一列不同时按降序排序,第一列相同时第二列按升序排列
	 * 第一列:降序,当第一列相同时,第二列:升序
	 * 为了实现降序,
	 */
	public int compareTo(CombineKey o) {
		//如果a-b<0即,a小于b,按这样 的思路应该是升序排列,我们可以返回一个相反数使其降序
		long tmp = this.comKey - o.comKey;
		//如果第一个值不相等时,我们就先对第一列进行排序
		if (tmp != 0){
			return (int) (-tmp);
		}
		//如果第一列相等时,我们就对第二列进行升序排列
		return (int) (this.comVal - o.comVal);
	}
	
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + (int) (comKey ^ (comKey >>> 32));
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		CombineKey other = (CombineKey) obj;
		if (comKey != other.comKey)
			return false;
		return true;
	}
	
}

程序运行结果就不贴了,和预想的一样!

【上篇】
【下篇】

抱歉!评论已关闭.