现在的位置: 首页 > 云计算 > 正文

hadoop 命令行运行任务

2013年02月17日 云计算 ⁄ 共 3167字 ⁄ 字号 评论关闭

好多的教程是通过eclipse提交的任务。当然这只是玩玩。

如果在ubuntu-server集群中这当然行不通。因此jobtracker通过命令行的形式提交任务看起来比较实际。

比如

file1.txt:

2012-3-1 a
2012-3-2 b
2012-3-3 c 
2012-3-4 d 
2012-3-5 a 
2012-3-6 b
2012-3-7 c
2012-3-3 c

file2.txt:

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d 
2012-3-5 a 
2012-3-6 c
2012-3-7 d
2012-3-3 c 

任务是将file1.txt+file2.txt并实现去重。

Dedup.java:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.fs.Path; 

import org.apache.hadoop.io.IntWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.Mapper; 

import org.apache.hadoop.mapreduce.Reducer; 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

import org.apache.hadoop.util.GenericOptionsParser; 

public class Dedup { 

 
    //map将输入中的value复制到输出数据的key上,并直接输出

    public static class Map extends Mapper<Object,Text,Text,Text>{ 

        private static Text line=new Text();//每行数据       

        //实现map函数

        public void map(Object key,Text value,Context context) 

                throws IOException,InterruptedException{ 

            line=value; 

            context.write(line, new Text("")); 

        } 
        
    } 
   
    //reduce将输入中的key复制到输出数据的key上,并直接输出

    public static class Reduce extends Reducer<Text,Text,Text,Text>{ 

        //实现reduce函数

        public void reduce(Text key,Iterable<Text> values,Context context) 

                throws IOException,InterruptedException{ 

            context.write(key, new Text("")); 

        } 
       
    }     

    public static void main(String[] args) throws Exception{ 

        Configuration conf = new Configuration(); 

        //这句话很关键

        conf.set("mapred.job.tracker", "192.168.1.2:9001"); 
        
        String[] ioArgs=new String[]{"dedup_in","dedup_out"}; 

     String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); 

     if (otherArgs.length != 2) { 

     System.err.println("Usage: Data Deduplication <in> <out>"); 

     System.exit(2); 

     } 

     Job job = new Job(conf, "Data Deduplication"); 

     job.setJarByClass(Dedup.class);      

     //设置Map、Combine和Reduce处理类

     job.setMapperClass(Map.class); 

     job.setCombinerClass(Reduce.class); 

     job.setReducerClass(Reduce.class);   

     //设置输出类型

     job.setOutputKeyClass(Text.class); 

     job.setOutputValueClass(Text.class); 

     //设置输入和输出目录

     FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
     } 
} 

命令行下输入:

hadoop/bin/hadoop fs -mkdir dedup_in
hadoop/bin/hadoop fs -put file1.txt dedup_in
hadoop/bin/hadoop fs -put file2.txt dedup_in

 

mkdir classes
javac -classpath hadoop/hadoop-core-0.20.204.0.jar:hadoop/lib/commons-cli-1.2.jar -d classes/ Dedup.java
jar -cvf Dedup.jar -C classes/ . 
hadoop/bin/hadoop jar Dedup.jar Dedup Dedup_in Dedup_out

查看结果:

root@hadoop1:~# hadoop/bin/hadoop fs -lsr dedup_out
-rw-r--r--   3 root supergroup          0 2013-09-17 20:15 /user/root/dedup_out/_SUCCESS
drwxr-xr-x   - root supergroup          0 2013-09-17 20:15 /user/root/dedup_out/_logs
drwxr-xr-x   - root supergroup          0 2013-09-17 20:15 /user/root/dedup_out/_logs/history
-rw-r--r--   3 root supergroup      12655 2013-09-17 20:15 /user/root/dedup_out/_logs/history/job_201309171911_0004_1379420132659_root_Data+Deduplication
-rw-r--r--   3 root supergroup      19781 2013-09-17 20:15 /user/root/dedup_out/_logs/history/job_201309171911_0004_conf.xml
-rw-r--r--   3 root supergroup        146 2013-09-17 20:15 /user/root/dedup_out/part-r-00000
root@hadoop1:~# hadoop/bin/hadoop fs -cat dedup_out/part-r-000000

 

2012-3-1 a	
2012-3-1 b	
2012-3-2 a	
2012-3-2 b	
2012-3-3 b	
2012-3-3 c	
2012-3-4 d	
2012-3-5 a	
2012-3-6 b	
2012-3-6 c	
2012-3-7 c	
2012-3-7 d	

 

 

 

 

 

抱歉!评论已关闭.