边数据分布:
边数据是作业所需的额外的只读数据,以辅助处理主数据集。所面临的挑战是如何使所用Map和Reduce任务都能够方便而高效的使用边数据。
我们用DistributedCache Api实现:
1)刚开始用1.0.0的api怎么也不行,运行到patternsFiles = DistributedCache.getLocalCacheFiles(job);patternsFiles一直为空。google后说是老版本的好实现,就实验了下老版本:
- import java.io.*;
- import java.util.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.*;
- public class testDistributeCache extends Configured implements Tool {
- public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
- public void configure(JobConf job) {
- Path[] patternsFiles;
- boolean test = job.getBoolean("wordcount.skip.patterns", false);
- try {
- <span style="color:#ff6666;">patternsFiles = DistributedCache.getLocalCacheFiles(job);
- lt;/span> int test4=0;
- } catch (IOException ioe) {
- System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
- }
- }
- @Override
- public void map(LongWritable arg0, Text arg1,
- OutputCollector<Text, IntWritable> arg2, Reporter arg3)
- throws IOException {
- arg2.collect(arg1, new IntWritable(1));
- // TODO Auto-generated method stub
- }
- }
- public int run(String[] args) throws Exception {
- JobConf conf = new JobConf();
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(LongWritable.class);
- conf.setMapperClass(Map.class);
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- DistributedCache.addCacheFile(new Path("/testDisFile").toUri(), conf);
- conf.setBoolean("wordcount.skip.patterns", true);
- FileInputFormat.setInputPaths(conf,new Path("/testDisIn") );
- FileOutputFormat.setOutputPath(conf, new Path("/testDisOut"));
- JobClient.runJob(conf);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new testDistributeCache(), args);
- System.exit(res);
- }
- }
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class testDistributeCache extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
public void configure(JobConf job) {
Path[] patternsFiles;
boolean test = job.getBoolean("wordcount.skip.patterns", false);
try {
<span style="color:#ff6666;">patternsFiles = DistributedCache.getLocalCacheFiles(job);
</span> int test4=0;
} catch (IOException ioe) {
System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
}
}
@Override
public void map(LongWritable arg0, Text arg1,
OutputCollector<Text, IntWritable> arg2, Reporter arg3)
throws IOException {
arg2.collect(arg1, new IntWritable(1));
// TODO Auto-generated method stub
}
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf();
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(Map.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
DistributedCache.addCacheFile(new Path("/testDisFile").toUri(), conf);
conf.setBoolean("wordcount.skip.patterns", true);
FileInputFormat.setInputPaths(conf,new Path("/testDisIn") );
FileOutputFormat.setOutputPath(conf, new Path("/testDisOut"));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new testDistributeCache(), args);
System.exit(res);
}
}
测试
12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Creating testDisFile in /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost-work-9116784916847441351 with rwxr-xr-x
12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Cached /testDisFile as /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost/testDisFile
12/04/11 08:40:32 INFO filecache.TrackerDistributedCacheManager: Cached /testDisFile as /tmp/hadoop-mjiang/mapred/local/archive/-3764210432234018452_-1245135191_482447420/localhost/testDisFile
12/04/11 08:40:32
debug时
- patternsFiles = DistributedCache.getLocalCacheFiles(job);
显示patternsFiles有值,说明是conf的问题
2)
---The Hadoop Map-Reduce framework spawns one map task for each
InputSplit
generated by the
InputFormat
for the job. Mapper
implementations can access theConfiguration
for the job via the
~Context.getConfiguration()
.
The framework first calls
setup(org.apache.hadoop.mapreduce.Mapper.Context)
, followed by
map(Object, Object, Context)
for each key/value pair in the
. Finally
InputSplit
cleanup(Context)
is called.
---
jobconf 或job或者configuratin只能有一个,或者是关联的。
---
又连续测试了很多情况最后终于OK了
- import java.io.*;
- import java.net.URI;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.*;
- public class getBinaryFpAndPredict {
- //private BufferedReader modelBR = new BufferedReader(new FileReader("/home/mjiang/java/eclipse/hadoop/Target-1/data/models.txt"));
- public static class TargetMapper extends Mapper<LongWritable, Text, Text, Text> {
- private Path[] modelPath;
- private BufferedReader modelBR ;
- public void setup(Context context) throws IOException,InterruptedException{
- //Configuration conf = new Configuration(); /testS 为空
- Configuration conf = context.getConfiguration(); //testS 为空
- boolean test = conf.getBoolean("wordcount.skip.patterns", false);
- modelPath = DistributedCache.getLocalCacheFiles(conf);
- int testi = 0;
- modelBR = new BufferedReader(new FileReader(modelPath[0].toString()));
- }
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- //获取model的文件
- String hexFp = value.toString().replaceAll("\t", "").replaceAll(" ", "");
- context.write(new Text("1"),new Text(hexFp+"~~~~~~~"+modelBR.readLine()));
- }
- }
- static class TargetReducer extends Reducer<Text, Text, Text, Text> {
- public void reduce(Text key, Text values, Context context) throws IOException, InterruptedException {
- context.write(key, values);
- }
- }
- public static void main(String[] args) throws Exception {
- Job job = new Job();
- Configuration conf = job.getConfiguration();
- conf.setBoolean("wordcount.skip.patterns", true);
- DistributedCache.addCacheFile(new Path("target/model/all.model").toUri(), conf);
- job.setJarByClass(getBinaryFpAndPredict.class);
- FileInputFormat.addInputPath(job, new Path("target/hexFp/all.hexFp"));
- FileOutputFormat.setOutputPath(job, new Path("/testOut"));
- job.setInputFormatClass(hexFpTextInputFormat.class);
- job.setMapperClass(TargetMapper.class);
- job.setReducerClass(TargetReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- boolean test = conf.getBoolean("wordcount.skip.patterns", false);
- //MapContext jobConf= = new MapContext(conf, null);
- Path[] modelPath = DistributedCache.getLocalCacheFiles(conf);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.*;
public class getBinaryFpAndPredict {
//private BufferedReader modelBR = new BufferedReader(new
FileReader("/home/mjiang/java/eclipse/hadoop/Target-1/data/models.txt"));
public static class TargetMapper extends Mapper<LongWritable,
Text, Text, Text> {
private Path[] modelPath;
private BufferedReader modelBR ;
public void setup(Context context) throws
IOException,InterruptedException{
//Configuration conf = new Configuration(); /testS 为空
Configuration conf = context.getConfiguration(); //testS 为空
boolean test = conf.getBoolean("wordcount.skip.patterns",
false);
modelPath = DistributedCache.getLocalCacheFiles(conf);
int testi = 0;
modelBR = new BufferedReader(new
FileReader(modelPath[0].toString()));
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取model的文件
String hexFp = value.toString().replaceAll("\t", "").replaceAll("
", "");
context.write(new Text("1"),new
Text(hexFp+"~~~~~~~"+modelBR.readLine()));
}
}
static class TargetReducer extends Reducer<Text, Text, Text,
Text> {
public void reduce(Text key, Text values, Context context) throws
IOException, InterruptedException {
context.write(key, values);
}
}
public static void main(String[] args) throws Exception {
Job job = new Job();
Configuration conf = job.getConfiguration();
conf.setBoolean("wordcount.skip.patterns", true);
DistributedCache.addCacheFile(new
Path("target/model/all.model").toUri(), conf);
job.setJarByClass(getBinaryFpAndPredict.class);
FileInputFormat.addInputPath(job, new
Path("target/hexFp/all.hexFp"));
FileOutputFormat.setOutputPath(job, new Path("/testOut"));
job.setInputFormatClass(hexFpTextInputFormat.class);
job.setMapperClass(TargetMapper.class);
job.setReducerClass(TargetReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean test = conf.getBoolean("wordcount.skip.patterns", false);
//MapContext jobConf= = new MapContext(conf, null);
Path[] modelPath = DistributedCache.getLocalCacheFiles(conf);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}在调试时还不行,运行就OK了。木有保存吗??郁闷