现在的位置: 首页 > 综合 > 正文

hadoop1.0.0版本下DistributedCache实现

2014年10月03日 ⁄ 综合 ⁄ 共 10915字 ⁄ 字号 评论关闭

边数据分布:

边数据是作业所需的额外的只读数据,以辅助处理主数据集。所面临的挑战是如何使所用Map和Reduce任务都能够方便而高效的使用边数据。

我们用DistributedCache Api实现:

1)刚开始用1.0.0的api怎么也不行,运行到patternsFiles = DistributedCache.getLocalCacheFiles(job);patternsFiles一直为空。google后说是老版本的好实现,就实验了下老版本:
 

  1. import java.io.*;  
  2.    import java.util.*;  
  3.      
  4.    import org.apache.hadoop.fs.Path;  
  5.    import org.apache.hadoop.filecache.DistributedCache;  
  6.    import org.apache.hadoop.conf.*;  
  7.    import org.apache.hadoop.io.*;  
  8.    import org.apache.hadoop.mapred.*;  
  9.    import org.apache.hadoop.util.*;  
  10.      
  11.    public class testDistributeCache extends Configured implements Tool {      
  12.       public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  
  13.      
  14.         public void configure(JobConf job) {  
  15.         Path[] patternsFiles;  
  16.         boolean test =  job.getBoolean("wordcount.skip.patterns"false);  
  17.         try {  
  18.               <span style="color:#ff6666;">patternsFiles = DistributedCache.getLocalCacheFiles(job);  
  19. lt;/span>               int test4=0;  
  20.             } catch (IOException ioe) {  
  21.               System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));  
  22.             }  
  23.         }  
  24.   
  25.    @Override  
  26.    public void map(LongWritable arg0, Text arg1,  
  27.            OutputCollector<Text, IntWritable> arg2, Reporter arg3)  
  28.            throws IOException {  
  29.        arg2.collect(arg1, new IntWritable(1));  
  30.        // TODO Auto-generated method stub  
  31.          
  32.        }  
  33.    }  
  34.   
  35.   
  36.   public int run(String[] args) throws Exception {  
  37.         JobConf conf = new JobConf();  
  38.   
  39.     conf.setOutputKeyClass(Text.class);  
  40.     conf.setOutputValueClass(LongWritable.class);  
  41.   
  42.     conf.setMapperClass(Map.class);  
  43.   
  44.     conf.setInputFormat(TextInputFormat.class);  
  45.         conf.setOutputFormat(TextOutputFormat.class);  
  46.      
  47.   
  48.             DistributedCache.addCacheFile(new Path("/testDisFile").toUri(), conf);  
  49.             conf.setBoolean("wordcount.skip.patterns"true);  
  50.   
  51.      
  52.         FileInputFormat.setInputPaths(conf,new Path("/testDisIn") );  
  53.         FileOutputFormat.setOutputPath(conf, new Path("/testDisOut"));  
  54.      
  55.         JobClient.runJob(conf);  
  56.         return 0;  
  57.       }  
  58.      
  59.       public static void main(String[] args) throws Exception {  
  60.         int res = ToolRunner.run(new Configuration(), new testDistributeCache(), args);  
  61.         System.exit(res);  
  62.       }  
  63.    }  

测试

12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Creating testDisFile in /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost-work-9116784916847441351 with rwxr-xr-x

12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Cached /testDisFile as /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost/testDisFile
12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Cached /testDisFile as /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost/testDisFile

12/04/11 08:40:32

debug时

  1. patternsFiles = DistributedCache.getLocalCacheFiles(job);  

显示patternsFiles有值,说明是conf的问题

2)

---The Hadoop Map-Reduce framework spawns one map task for each
InputSplit
generated by the
InputFormat
for the job. Mapper implementations can access theConfiguration
for the job via the
~Context.getConfiguration()
.

The framework first calls
setup(org.apache.hadoop.mapreduce.Mapper.Context)
, followed by
map(Object, Object, Context)
for each key/value pair in the
InputSplit
. Finally
cleanup(Context)
is called.

---

jobconf 或job或者configuratin只能有一个,或者是关联的。

---

又连续测试了很多情况最后终于OK了

  1.         import java.io.*;  
  2.     import java.net.URI;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.filecache.DistributedCache;  
  6.     import org.apache.hadoop.fs.Path;  
  7.     import org.apache.hadoop.io.*;  
  8. import org.apache.hadoop.mapred.JobConf;  
  9.     import org.apache.hadoop.mapreduce.Job;  
  10.     import org.apache.hadoop.mapreduce.Mapper;  
  11.     import org.apache.hadoop.mapreduce.Reducer;  
  12.     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14. import org.apache.hadoop.mapreduce.*;  
  15.   
  16.   
  17.   
  18.   
  19.     public class getBinaryFpAndPredict {  
  20.         //private BufferedReader modelBR = new BufferedReader(new FileReader("/home/mjiang/java/eclipse/hadoop/Target-1/data/models.txt"));  
  21.       public static class TargetMapper  extends Mapper<LongWritable, Text, Text, Text> {    
  22.   
  23.            private Path[] modelPath;  
  24.            private BufferedReader modelBR ;  
  25.            public void setup(Context context) throws IOException,InterruptedException{  
  26.          
  27.                //Configuration conf = new Configuration(); /testS 为空  
  28.                Configuration conf = context.getConfiguration(); //testS 为空   
  29.                boolean test =  conf.getBoolean("wordcount.skip.patterns"false);  
  30.                modelPath =  DistributedCache.getLocalCacheFiles(conf);  
  31.                  
  32.                int testi = 0;  
  33.                modelBR = new BufferedReader(new FileReader(modelPath[0].toString()));  
  34.            }        
  35.   
  36.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {     
  37.            //获取model的文件  
  38.             String hexFp = value.toString().replaceAll("\t""").replaceAll(" """);      
  39.               
  40.             context.write(new Text("1"),new Text(hexFp+"~~~~~~~"+modelBR.readLine()));  
  41.         }  
  42.       }  
  43.               
  44.       static class TargetReducer extends Reducer<Text, Text, Text, Text> {  
  45.           public void reduce(Text key, Text values, Context context) throws IOException, InterruptedException {  
  46.               context.write(key, values);  
  47.           }  
  48.       }  
  49.   
  50.   
  51.   
  52.       public static void main(String[] args) throws Exception {  
  53.   
  54.             Job job = new Job();  
  55.             Configuration conf = job.getConfiguration();  
  56.           conf.setBoolean("wordcount.skip.patterns"true);  
  57.           DistributedCache.addCacheFile(new Path("target/model/all.model").toUri(), conf);  
  58.                   
  59.   
  60.         job.setJarByClass(getBinaryFpAndPredict.class);  
  61.   
  62.         FileInputFormat.addInputPath(job, new Path("target/hexFp/all.hexFp"));  
  63.         FileOutputFormat.setOutputPath(job, new Path("/testOut"));  
  64.           
  65.         job.setInputFormatClass(hexFpTextInputFormat.class);  
  66.           
  67.         job.setMapperClass(TargetMapper.class);  
  68.         job.setReducerClass(TargetReducer.class);  
  69.   
  70.         job.setOutputKeyClass(Text.class);  
  71.         job.setOutputValueClass(Text.class);  
  72.           
  73.         boolean test =  conf.getBoolean("wordcount.skip.patterns"false);  
  74.         //MapContext jobConf= = new MapContext(conf, null);  
  75.         Path[] modelPath = DistributedCache.getLocalCacheFiles(conf);  
  76.           
  77.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  78.       }  
  79.     }  

在调试时还不行,运行就OK了。木有保存吗??郁闷

【上篇】
【下篇】

抱歉!评论已关闭.