package com.ccse.hadoop.partitioner; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import com.ccse.hadoop.mapreduce.KpiWritable; public class MobileApp { public static final String INPUT_PATH = "hdfs://chaoren1:9000/mobile/mobile.dat"; public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/mobileout"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MobileApp.class.getSimpleName()); job.setJarByClass(MobileApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setPartitionerClass(HashPartitioner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWritable.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2); //设置2个Reduce工作 job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> { private Text mapperKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, KpiWritable>.Context context) throws IOException, InterruptedException { String target = value.toString(); String[] kpis = target.split("\t"); mapperKey.set(kpis[1]); context.write(mapperKey, new KpiWritable(Long.parseLong(kpis[5]), Long.parseLong(kpis[6]), Long.parseLong(kpis[7]), Long.parseLong(kpis[8]))); } } public static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> { @Override protected void reduce(Text key, Iterable<KpiWritable> values, Reducer<Text, KpiWritable, Text, KpiWritable>.Context context) throws IOException, InterruptedException { long upPackNum = 0l; long downPackNum = 0l; long upPayLoad = 0; long downPayLoad = 0; if (values != null) { while (values.iterator().hasNext()) { KpiWritable kpi = values.iterator().next(); upPackNum += kpi.getUpPackNum(); downPackNum += kpi.getDownPackNum(); upPayLoad += kpi.getUpPayLoad(); downPayLoad += kpi.getDownPayLoad(); } context.write(key, new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad)); } } } }
设置默认Partitioner的代码如上所示,这样会产生2个输出文件。
package com.ccse.hadoop.partitioner; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import com.ccse.hadoop.mapreduce.KpiWritable; public class MobileApp { public static final String INPUT_PATH = "hdfs://chaoren1:9000/mobile/mobile.dat"; public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/mobileout"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); fileSystem.delete(new Path(OUTPUT_PATH), true); Job job = new Job(conf, MobileApp.class.getSimpleName()); job.setJarByClass(MobileApp.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setPartitionerClass(MyPartitioner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(KpiWritable.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2); //设置2个Reduce工作 job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> { private Text mapperKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, KpiWritable>.Context context) throws IOException, InterruptedException { String target = value.toString(); String[] kpis = target.split("\t"); mapperKey.set(kpis[1]); context.write(mapperKey, new KpiWritable(Long.parseLong(kpis[5]), Long.parseLong(kpis[6]), Long.parseLong(kpis[7]), Long.parseLong(kpis[8]))); } } public static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> { @Override protected void reduce(Text key, Iterable<KpiWritable> values, Reducer<Text, KpiWritable, Text, KpiWritable>.Context context) throws IOException, InterruptedException { long upPackNum = 0l; long downPackNum = 0l; long upPayLoad = 0; long downPayLoad = 0; if (values != null) { while (values.iterator().hasNext()) { KpiWritable kpi = values.iterator().next(); upPackNum += kpi.getUpPackNum(); downPackNum += kpi.getDownPackNum(); upPayLoad += kpi.getUpPayLoad(); downPayLoad += kpi.getDownPayLoad(); } context.write(key, new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad)); } } } public static class MyPartitioner extends Partitioner<Text, KpiWritable> { @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { final int length = key.toString().length(); return length == 11 ? 0 : 1; //如果长度为11,则为手机号,否则不是手机号 } } }
自定义了Partitioner后的程序,这样为手机号的记录生成在一个文件中,不为手机号的记录生成在另外一个文件中。