现在的位置: 首页 > 综合 > 正文

MapReduce程序打成jar包在远程服务器运行

2018年06月05日 ⁄ 综合 ⁄ 共 6113字 ⁄ 字号 评论关闭

一:背景

有时候,我们不想再程序中显示的指定输入路径和输出路径,因为那样不太灵活,不利于扩展,Hadoop提供了将程序打成jar包发到集群上通过命令行参数指定输入输出路径的方式运行程序。

二:技术实现

(1):主类继承Configured类还要实现Tool接口。

(2):将我们以前写的设置各种参数的代码写在run()方法中(实现接口必须要实现run方法)。

(3):还有一句很关键的代码就是:job.setJarByClass(XXX.class);即以Jar包的形式运行。

我们以单词计数为例,有以下两种方法!!!


方法一:继承Configured类和实现Tool接口

public class WordCount extends Configured implements Tool {

	// 定义输入路径
	private String INPUT_PATH = "";
	// 定义输出路径
	private String OUT_PATH = "";

	public static void main(String[] args) {

		try {
			ToolRunner.run(new WordCount(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {

			// 注:value是hadoop的Text类型,调用toString可以转换成java的类型。
			String[] splited = value.toString().split("\t");

			// 迭代
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1L));
			}
		}
	}

	static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

		@Override
		protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {

			long sum = 0L;
			// 迭代计算单词在文件中出现的总记录数
			for (LongWritable v2 : v2s) {
				sum += v2.get();
			}

			// 写到上下文中
			context.write(k2, new LongWritable(sum));
		}
	}

	public int run(String[] args) throws Exception {

		// 给路径赋值
		INPUT_PATH = args[0];
		OUT_PATH = args[1];

		try {
			// 创建配置信息
			Configuration conf = new Configuration();
			// 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)

			//conf.addResource("classpath://hadoop/core-site.xml");
			//conf.addResource("classpath://hadoop/hdfs-site.xml");
			//conf.addResource("classpath://hadoop/mapred-site.xml");

			// 如果输出目录存在,我们就进行删除
			FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}

			// 创建任务
			Job job = new Job(conf, WordCount.class.getSimpleName());
			// 通过命令行传参的形式必须走这一步(打包运行必须执行的关键)
			job.setJarByClass(WordCount.class);

			// 1.1 设置输入目录
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			// 指定对输入数据进行格式化处理的类(这个可以省略,默认值即可)
			job.setInputFormatClass(TextInputFormat.class);

			// 1.2 指定自定义的Mapper类
			job.setMapperClass(MyMapper.class);
			// 指定map输出的<K,V>类型(如果<K3,V3>的类型和<K2,V2>的类型一致,那么可以省略)
			// job.setMapOutputKeyClass(Text.class);
			// job.setMapOutputValueClass(LongWritable.class);

			// 1.3 分区(可以省略,默认即可)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);// 默认的分区是1个,所以任务也就是1个

			// 1.4排序、分组

			// 1.5 归约

			// 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同reduce节点。

			// 2.2 指定自定义的reduce类
			job.setReducerClass(MyReduce.class);
			// 指定<K,V>的类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(LongWritable.class);

			// 2.3 指定输出的路径
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			// 指定输出的格式化类(这个可以省略,使用默认值即可)
			job.setOutputFormatClass(TextOutputFormat.class);

			// 把作业提交给JobTracker完成
			job.waitForCompletion(true);

		} catch (Exception e) {
			e.printStackTrace();
		}

		return 0;
	}
}

上传到远程服务器,通过如下命令运行:

我们可以先用jar -tf XXX.jar查看jar中的结构,如下


然后用hadoop jar命令运行,如果我们是直接把整个项目打成一个jar包(因为很多时候我们会将类写在多个文件中,所以要打包整个项目),我们就要加上包名.主类名。如:

hadoop jar WordCount.jar
com.lixue.run.cmd.WordCount
 /hello  /out 

由于我们这个例子是直接将主类进行了打包,所以我们可以直接运行jar包就可以了,如下:

hadoop jar WordCount.jar hdfs://liaozhongmin5:9000/hello hdfs://liaozhongmin5:9000/out

命令格式为:hadoop jar XXX.jar 输入路径 输出路径(前面的hdfs://liaozhongmin5:9000可以省略)


方法二:也可以不继承Configured和实现Tool接口,代码如下:

public class WordCountTest {
	
	//定义输入路径
	private static String IN_PATH = "";
	//定义输出路径
	private static String OUT_PATH = "";

	public static void main(String[] args) {

		try {
			// 创建配置信息
			Configuration conf = new Configuration();
			//获取命令行的参数
			String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
			//当参数违法时,中断程序
			if (otherArgs.length != 2){
				System.err.println("Usage:wordcount<in> <out>");
				System.exit(1);
			}
			
			//给路径赋值
			IN_PATH = otherArgs[0];
			OUT_PATH = otherArgs[1];
			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(new URI(OUT_PATH)))) {
				fileSystem.delete(new Path(new URI(OUT_PATH)), true);
			}

			// 创建任务
			Job job = new Job(conf, WordCountTest.class.getName());
			//打成jar包运行,这句话是关键
			job.setJarByClass(WordCountTest.class);
			//1.1	设置输入目录和设置输入数据格式化的类
			FileInputFormat.setInputPaths(job, IN_PATH);
			job.setInputFormatClass(TextInputFormat.class);

			//1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(MyMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(LongWritable.class);

			//1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);

			//1.4	排序
			//1.5	归约
			//2.1	Shuffle把数据从Map端拷贝到Reduce端。
			//2.2	指定Reducer类和输出key和value的类型
			job.setReducerClass(MyReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(LongWritable.class);

			//2.3	指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);


			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);
		
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		// 定义一个LongWritable对象作为map输出的value类型
		LongWritable oneTime = new LongWritable(1);
		// 定义一个Text对象作为map输出的key类型
		Text word = new Text();

		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {

			// 对每一行记录采用制表符(\t)进行分割
			String[] splits = value.toString().split("\t");

			// 遍历字符串数组输出每一个单词
			for (String str : splits) {

				// 设置word
				word.set(str);
				// 把结果写出去
				context.write(word, oneTime);
			}
		}
	}

	public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

		// 定义LongWritable对象最为Reduce输出的value类型
		LongWritable result = new LongWritable();

		protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
				InterruptedException {

			int sum = 0;

			// 遍历集合,计算每个单词出现的和
			for (LongWritable s : values) {

				sum += s.get();
			}
			// 设置result
			result.set(sum);
			// 把结果写出去
			context.write(key, result);
		}
	}
}

注:这种方式比较简单,运行的方式还是一样!

程序运行的日志如下:

我们通过通过Hadoop默认计数器分析MapReduce资源消耗来分析这个程序运行所消耗的资源

1.MapReduce任务的计算量即消耗CPU的时间=CPU time spent (ms)=1400

2.程序运行所消耗的物理内存=Physical memory (bytes) snapshot=184586240

3.程序运行所消耗的虚拟内存=Virtual memory (bytes) snapshot=756031488

4.JVM当前堆的大小=Total committed heap usage (bytes)=177016832

5.IO消耗=HDFS_BYTES_READ+HDFS_BYTES_WRITTEN*副本数+FILE_BYTES_READ+FILE_BYTES_WRITTEN=115+19*1+65+105096

6.网络流量消耗情况=HDFS_BYTES_READ+HDFS_BYTES_WRITTEN*副本数+Reduce shuffle bytes=115+19*1 + 65

抱歉!评论已关闭.