package com.ccse.hadoop.group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.ccse.hadoop.sort.SortApp; /** * 自定义分组,当第一列相同时,获取第二列的最大值 * @author woshiccna * */ public class GroupApp { public static final String INPUT_PATH = "hdfs://chaoren1:9000/sortinput"; public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/sortoutput"; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, SortApp.class.getSimpleName()); job.setJarByClass(SortApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(MyK2.class); job.setMapOutputValueClass(LongWritable.class); job.setGroupingComparatorClass(MyGroupComparator.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, MyK2, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, MyK2, LongWritable>.Context context) throws IOException, InterruptedException { if (value != null) { String[] splitted = value.toString().split("\t"); LongWritable second = new LongWritable(Long.parseLong(splitted[1])); context.write(new MyK2(Long.parseLong(splitted[0]), Long.parseLong(splitted[1])), second); } } } public static class MyReducer extends Reducer<MyK2, LongWritable, LongWritable, LongWritable> { @Override protected void reduce( MyK2 key, Iterable<LongWritable> values, Reducer<MyK2, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { long max = Long.MIN_VALUE; for (LongWritable writable : values) { //找出第二列的最大值 if (writable.get() > max) { max = writable.get(); } } context.write(new LongWritable(key.first), new LongWritable(max)); } } public static class MyK2 implements WritableComparable<MyK2> { private long first; private long second; public MyK2() {} public MyK2(long first, long second) { this.first = first; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.first); out.writeLong(this.second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public int compareTo(MyK2 to) { if (to != null) { long dis = this.first - to.first; if (dis != 0) { //首先根据first进行排序 return (int)dis; } else { //然后根据second进行排序 return (int)(this.second - to.second); } } return 0; } } public static class MyGroupComparator implements RawComparator<MyK2> { @Override public int compare(MyK2 arg0, MyK2 arg1) { return 0; } /** * b1 表示第1个参与比较的字节数组 * s1 表示第1个字节数组中开始比较的位置 * l1 表示第1个字节数组参与比较的字节长度 * b2 表示第2个参与比较的字节数组 * s2 表示第2个字节数组中开始比较的位置 * l2 表示第2个字节数组参与比较的字节长度 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } } }