现在的位置: 首页 > 综合 > 正文

自定义Partitioner

2018年05月20日 ⁄ 综合 ⁄ 共 5584字 ⁄ 字号 评论关闭

package com.ccse.hadoop.partitioner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

import com.ccse.hadoop.mapreduce.KpiWritable;

public class MobileApp {

	public static final String INPUT_PATH = "hdfs://chaoren1:9000/mobile/mobile.dat";
	public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/mobileout";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		fileSystem.delete(new Path(OUTPUT_PATH), true);
		
		Job job = new Job(conf, MobileApp.class.getSimpleName());
		job.setJarByClass(MobileApp.class);
		
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setPartitionerClass(HashPartitioner.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWritable.class);
		
		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(2);    //设置2个Reduce工作
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		
		job.waitForCompletion(true);
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
		private Text mapperKey = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			String target = value.toString();
			String[] kpis = target.split("\t");
			mapperKey.set(kpis[1]);
			context.write(mapperKey, new KpiWritable(Long.parseLong(kpis[5]), Long.parseLong(kpis[6]), 
					Long.parseLong(kpis[7]), Long.parseLong(kpis[8])));
		}
		
	}
	
	public static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {
		@Override
		protected void reduce(Text key, Iterable<KpiWritable> values,
				Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			long upPackNum = 0l;
			long downPackNum = 0l;
			long upPayLoad = 0;
			long downPayLoad = 0;
			if (values != null) {
				while (values.iterator().hasNext()) {
					KpiWritable kpi = values.iterator().next();
					upPackNum += kpi.getUpPackNum();
					downPackNum += kpi.getDownPackNum();
					upPayLoad += kpi.getUpPayLoad();
					downPayLoad += kpi.getDownPayLoad();
				}
				context.write(key, new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad));
			}
		}
	}

}

设置默认Partitioner的代码如上所示,这样会产生2个输出文件。

package com.ccse.hadoop.partitioner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

import com.ccse.hadoop.mapreduce.KpiWritable;

public class MobileApp {

	public static final String INPUT_PATH = "hdfs://chaoren1:9000/mobile/mobile.dat";
	public static final String OUTPUT_PATH = "hdfs://chaoren1:9000/mobileout";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		fileSystem.delete(new Path(OUTPUT_PATH), true);
		
		Job job = new Job(conf, MobileApp.class.getSimpleName());
		job.setJarByClass(MobileApp.class);
		
		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setPartitionerClass(MyPartitioner.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWritable.class);
		
		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(2);    //设置2个Reduce工作
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		
		job.waitForCompletion(true);
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
		private Text mapperKey = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			String target = value.toString();
			String[] kpis = target.split("\t");
			mapperKey.set(kpis[1]);
			context.write(mapperKey, new KpiWritable(Long.parseLong(kpis[5]), Long.parseLong(kpis[6]), 
					Long.parseLong(kpis[7]), Long.parseLong(kpis[8])));
		}
		
	}
	
	public static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {
		@Override
		protected void reduce(Text key, Iterable<KpiWritable> values,
				Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
				throws IOException, InterruptedException {
			long upPackNum = 0l;
			long downPackNum = 0l;
			long upPayLoad = 0;
			long downPayLoad = 0;
			if (values != null) {
				while (values.iterator().hasNext()) {
					KpiWritable kpi = values.iterator().next();
					upPackNum += kpi.getUpPackNum();
					downPackNum += kpi.getDownPackNum();
					upPayLoad += kpi.getUpPayLoad();
					downPayLoad += kpi.getDownPayLoad();
				}
				context.write(key, new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad));
			}
		}
	}
	
	public static class MyPartitioner extends Partitioner<Text, KpiWritable> {

		@Override
		public int getPartition(Text key, KpiWritable value, int numPartitions) {
			final int length = key.toString().length();
			return length == 11 ? 0 : 1;   //如果长度为11,则为手机号,否则不是手机号
		}
		
	}

}

自定义了Partitioner后的程序,这样为手机号的记录生成在一个文件中,不为手机号的记录生成在另外一个文件中。

【上篇】
【下篇】

抱歉!评论已关闭.