现在的位置: 首页 > 综合 > 正文

Hadoop自定义计数器Counter

2018年06月05日 ⁄ 综合 ⁄ 共 3412字 ⁄ 字号 评论关闭

一:背景

Hadoop计数器的主要价值在于可以让开发人员以全局的视角来审查程序的运行情况,及时作出错误诊断并进行相应的处理,Hadoop内置了很多计数器,这些计数器大致可以分为三组:MapReduce相关的计数器、文件系统相关的计数器以及作业调度相关的计数器。我们可以通过Eclipse控制台的输出或者是web页面http://master:50030进行查看。

二:技术实现

除了内置计数器,Hadoop还提供了自定义计数器的功能,自定义计数器经常适用于的场景是统计无效记录或者是统计敏感词。

定义一个计数器有两种形式

1.通过枚举类型进行定义:

	// 定义一个枚举,用于统计无效记录
		enum ERRORCounter {
			ERROR;
		}
context.getCounter(ERRORCounter.ERROR).increment(1);

2.动态声明计数器,不需要使用枚举

context.getCounter(String groupName,String counterName)

在程序中自定义一个计数器,运行程序,控制台输出的结果如下图:

注:通常情况下,我们自定义的计数器的名字会以:包名+类名+计数器名字的形式显示在控制台,有时候我们想显示的人性化一点即显示我们想要的计数器名称,解决方案是在该MapReduce程序同一个包下建立一个属性文件,文件的命名规则是:类_内部类_枚举.properties

属性文件的内容如下:

#CounterGroupName 用来声明groupName的控制台显示信息
CounterGroupName=敏感词计数器
#枚举类型加.name用来声明counterName的控制台显示信息
ERROR.name=非法记录

下面我们通过一个示例来深入体会,我这里是通过枚举的形式来自定义计数器的,程序结构如下:

注:因为我的计数器是定义在一个内部类中,所以属性文件的命名要注意规范哦,文件的内容在上面已经贴出来了。

代码如下:

public class ErrorCountMapReduce {
	
	// 定义输入路径
	private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/liao.txt";
	// 定义输出路径
	private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

	public static void main(String[] args) {

		try {
			// 创建配置信息
			Configuration conf = new Configuration();
			// 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
			/*
			 * conf.addResource("classpath://hadoop/core-site.xml"); 
			 * conf.addResource("classpath://hadoop/hdfs-site.xml");
			 * conf.addResource("classpath://hadoop/hdfs-site.xml");
			 */

			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}

			// 创建任务
			Job job = new Job(conf, ErrorCountMapReduce.class.getName());

			//1.1 设置输入目录和设置输入数据格式化的类
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			job.setInputFormatClass(TextInputFormat.class);

			//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(MyMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(LongWritable.class);

			//1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);

			//1.4 排序、分组
			//1.5 归约
			//2.1 Shuffle把数据从Map端拷贝到Reduce端。
			//2.2 指定Reducer类和输出key和value的类型
			job.setReducerClass(MyReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(LongWritable.class);

			//2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);

			// 提交作业
			job.waitForCompletion(true);

			// 退出
			System.exit(0);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		
		// 定义一个枚举,用于统计无效记录
		enum ERRORCounter {
			ERROR;
		}
		// 创建map函数输出的key的类型
		Text word = new Text();
		// 创建map函数输出的value的类型
		LongWritable oneTime = new LongWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {
			// 对每一行的文本内容进行切分
			String[] splits = value.toString().split("-");
			// 如果字符串数组中的第一个元素可以转成整形则表示该条记录有效,否则无效
			if (Integer.parseInt(splits[0]) == 1) {

				// 获取第2个元素
				word.set(splits[1]);

				// 把结果写出去
				context.write(word, oneTime);
			} else {
				//获取计数器,记录数加1
				context.getCounter(ERRORCounter.ERROR).increment(1);
			}
		}
	}

	public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

		// 定义LongWritable对象作为输出结果的value类型
		LongWritable result = new LongWritable();

		@Override
		protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {

			int sum = 0;

			for (LongWritable s : values) {

				sum += s.get();

			}
			// 给结果赋值
			result.set(sum);
			// 把结果写出去
			context.write(key, result);
		}
	}
}

程序运行的结果如下:

这就达到了我们想要的结果!

抱歉!评论已关闭.