现在的位置: 首页 > 综合 > 正文

自定义分组Group

2018年05月20日 ⁄ 综合 ⁄ 共 3565字 ⁄ 字号 评论关闭

package com.ccse.hadoop.group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.ccse.hadoop.sort.SortApp;

/**
 * 自定义分组,当第一列相同时,获取第二列的最大值
 * @author woshiccna
 *
 */
public class GroupApp {

	public static final String INPUT_PATH = "hdfs://chaoren1:9000/sortinput";
	public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/sortoutput";
	
	public static void main(String[] args) throws IOException, 
	     URISyntaxException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		fileSystem.delete(new Path(OUTPUT_PATH), true);
		
		Job job = new Job(conf, SortApp.class.getSimpleName());
		job.setJarByClass(SortApp.class);
		
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(MyK2.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setGroupingComparatorClass(MyGroupComparator.class);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		
		job.waitForCompletion(true);
		
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, MyK2, LongWritable> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, MyK2, LongWritable>.Context context)
				throws IOException, InterruptedException {
			if (value != null) {
				String[] splitted = value.toString().split("\t");
				LongWritable second = new LongWritable(Long.parseLong(splitted[1]));
				context.write(new MyK2(Long.parseLong(splitted[0]), Long.parseLong(splitted[1])), second);
			}
		}
		
	}
	
	public static class MyReducer extends Reducer<MyK2, LongWritable, LongWritable, LongWritable> {
		@Override
		protected void reduce(
				MyK2 key,
				Iterable<LongWritable> values,
				Reducer<MyK2, LongWritable, LongWritable, LongWritable>.Context context)
				throws IOException, InterruptedException {
			long max = Long.MIN_VALUE;
			for (LongWritable writable : values) {  //找出第二列的最大值
				if (writable.get() > max) {
					max = writable.get();
				}
			}
			context.write(new LongWritable(key.first), new LongWritable(max));
		}
	}
	
	public static class MyK2 implements WritableComparable<MyK2> {

		private long first;
		private long second;
		
		public MyK2() {}
		
		public MyK2(long first, long second) {
			this.first = first;
			this.second = second;
		}
		
		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(this.first);
			out.writeLong(this.second);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.first = in.readLong();
			this.second = in.readLong();
		}

		@Override
		public int compareTo(MyK2 to) {
			if (to != null) {
				long dis = this.first - to.first;
				if (dis != 0) {  //首先根据first进行排序
					return (int)dis;
				} else {   //然后根据second进行排序
					return (int)(this.second - to.second);
				}
			}
			return 0;
		}
	}
	
	public static class MyGroupComparator implements RawComparator<MyK2> {

		@Override
		public int compare(MyK2 arg0, MyK2 arg1) {
			return 0;
		}

		/**
		 * b1	表示第1个参与比较的字节数组
		 * s1	表示第1个字节数组中开始比较的位置
		 * l1	表示第1个字节数组参与比较的字节长度
		 * b2	表示第2个参与比较的字节数组
		 * s2	表示第2个字节数组中开始比较的位置
		 * l2	表示第2个字节数组参与比较的字节长度
		 */
		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
		}
		
	}
	
}

【上篇】
【下篇】

抱歉!评论已关闭.