一:背景
有时候,我们不想再程序中显示的指定输入路径和输出路径,因为那样不太灵活,不利于扩展,Hadoop提供了将程序打成jar包发到集群上通过命令行参数指定输入输出路径的方式运行程序。
二:技术实现
(1):主类继承Configured类还要实现Tool接口。
(2):将我们以前写的设置各种参数的代码写在run()方法中(实现接口必须要实现run方法)。
(3):还有一句很关键的代码就是:job.setJarByClass(XXX.class);即以Jar包的形式运行。
我们以单词计数为例,有以下两种方法!!!
方法一:继承Configured类和实现Tool接口
public class WordCount extends Configured implements Tool { // 定义输入路径 private String INPUT_PATH = ""; // 定义输出路径 private String OUT_PATH = ""; public static void main(String[] args) { try { ToolRunner.run(new WordCount(), args); } catch (Exception e) { e.printStackTrace(); } } static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { // 注:value是hadoop的Text类型,调用toString可以转换成java的类型。 String[] splited = value.toString().split("\t"); // 迭代 for (String word : splited) { context.write(new Text(word), new LongWritable(1L)); } } } static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0L; // 迭代计算单词在文件中出现的总记录数 for (LongWritable v2 : v2s) { sum += v2.get(); } // 写到上下文中 context.write(k2, new LongWritable(sum)); } } public int run(String[] args) throws Exception { // 给路径赋值 INPUT_PATH = args[0]; OUT_PATH = args[1]; try { // 创建配置信息 Configuration conf = new Configuration(); // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群) //conf.addResource("classpath://hadoop/core-site.xml"); //conf.addResource("classpath://hadoop/hdfs-site.xml"); //conf.addResource("classpath://hadoop/mapred-site.xml"); // 如果输出目录存在,我们就进行删除 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } // 创建任务 Job job = new Job(conf, WordCount.class.getSimpleName()); // 通过命令行传参的形式必须走这一步(打包运行必须执行的关键) job.setJarByClass(WordCount.class); // 1.1 设置输入目录 FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定对输入数据进行格式化处理的类(这个可以省略,默认值即可) job.setInputFormatClass(TextInputFormat.class); // 1.2 指定自定义的Mapper类 job.setMapperClass(MyMapper.class); // 指定map输出的<K,V>类型(如果<K3,V3>的类型和<K2,V2>的类型一致,那么可以省略) // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(LongWritable.class); // 1.3 分区(可以省略,默认即可) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1);// 默认的分区是1个,所以任务也就是1个 // 1.4排序、分组 // 1.5 归约 // 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同reduce节点。 // 2.2 指定自定义的reduce类 job.setReducerClass(MyReduce.class); // 指定<K,V>的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 2.3 指定输出的路径 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); // 指定输出的格式化类(这个可以省略,使用默认值即可) job.setOutputFormatClass(TextOutputFormat.class); // 把作业提交给JobTracker完成 job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } return 0; } }
上传到远程服务器,通过如下命令运行:
我们可以先用jar -tf XXX.jar查看jar中的结构,如下
然后用hadoop jar命令运行,如果我们是直接把整个项目打成一个jar包(因为很多时候我们会将类写在多个文件中,所以要打包整个项目),我们就要加上包名.主类名。如:
hadoop jar WordCount.jar
com.lixue.run.cmd.WordCount /hello /out
由于我们这个例子是直接将主类进行了打包,所以我们可以直接运行jar包就可以了,如下:
hadoop jar WordCount.jar hdfs://liaozhongmin5:9000/hello hdfs://liaozhongmin5:9000/out
命令格式为:hadoop jar XXX.jar 输入路径 输出路径(前面的hdfs://liaozhongmin5:9000可以省略)
方法二:也可以不继承Configured和实现Tool接口,代码如下:
public class WordCountTest { //定义输入路径 private static String IN_PATH = ""; //定义输出路径 private static String OUT_PATH = ""; public static void main(String[] args) { try { // 创建配置信息 Configuration conf = new Configuration(); //获取命令行的参数 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //当参数违法时,中断程序 if (otherArgs.length != 2){ System.err.println("Usage:wordcount<in> <out>"); System.exit(1); } //给路径赋值 IN_PATH = otherArgs[0]; OUT_PATH = otherArgs[1]; // 创建文件系统 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); // 如果输出目录存在,我们就删除 if (fileSystem.exists(new Path(new URI(OUT_PATH)))) { fileSystem.delete(new Path(new URI(OUT_PATH)), true); } // 创建任务 Job job = new Job(conf, WordCountTest.class.getName()); //打成jar包运行,这句话是关键 job.setJarByClass(WordCountTest.class); //1.1 设置输入目录和设置输入数据格式化的类 FileInputFormat.setInputPaths(job, IN_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //1.4 排序 //1.5 归约 //2.1 Shuffle把数据从Map端拷贝到Reduce端。 //2.2 指定Reducer类和输出key和value的类型 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定输出的路径和设置输出的格式化类 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); // 提交作业 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { // 定义一个LongWritable对象作为map输出的value类型 LongWritable oneTime = new LongWritable(1); // 定义一个Text对象作为map输出的key类型 Text word = new Text(); protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { // 对每一行记录采用制表符(\t)进行分割 String[] splits = value.toString().split("\t"); // 遍历字符串数组输出每一个单词 for (String str : splits) { // 设置word word.set(str); // 把结果写出去 context.write(word, oneTime); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { // 定义LongWritable对象最为Reduce输出的value类型 LongWritable result = new LongWritable(); protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { int sum = 0; // 遍历集合,计算每个单词出现的和 for (LongWritable s : values) { sum += s.get(); } // 设置result result.set(sum); // 把结果写出去 context.write(key, result); } } }
注:这种方式比较简单,运行的方式还是一样!
程序运行的日志如下:
我们通过通过Hadoop默认计数器分析MapReduce资源消耗来分析这个程序运行所消耗的资源
1.MapReduce任务的计算量即消耗CPU的时间=CPU time spent (ms)=1400
2.程序运行所消耗的物理内存=Physical memory (bytes) snapshot=184586240
3.程序运行所消耗的虚拟内存=Virtual memory (bytes) snapshot=756031488
4.JVM当前堆的大小=Total committed heap usage (bytes)=177016832
5.IO消耗=HDFS_BYTES_READ+HDFS_BYTES_WRITTEN*副本数+FILE_BYTES_READ+FILE_BYTES_WRITTEN=115+19*1+65+105096
6.网络流量消耗情况=HDFS_BYTES_READ+HDFS_BYTES_WRITTEN*副本数+Reduce shuffle bytes=115+19*1 + 65