最近一直在研究《Mahout in Action》,今天才算是把第一部分看完。在Chapter 6中有一个例子,是实现协同过滤进行推荐的例子,不过书上的是针对布尔值的输入数据,在mahout的安装目录里面也有这个算法的详细源码,但是毕竟是源码,读起来有点晦涩,所以就参考了书上的例子编写了(书上的例子思路比较清楚)不仅仅是布尔值的输入数据的代码;
下面就详细说下思路及代码:
输入数据:
第一列代表用户名ID,后面是项目ID,用逗号分隔
1,101,5.0 1,102,3.0 1,103,2.5 2,101,2.0 2,102,2.5 2,103,5.0 2,104,2.0 3,101,2.5 3,104,4.0 3,105,4.5 3,107,5.0 4,101,5.0 4,103,3.0 4,104,4.5 4,106,4.0 5,101,4.0 5,102,3.0 5,103,2.0 5,104,4.0 5,105,3.5 5,106,4.0
第一个MR 就是把输入数据的每个用户的信息整合下:
如下:
userid:1,vector:{103:2.5,102:3.0,101:5.0} userid:2,vector:{104:2.0,103:5.0,102:2.5,101:2.0} userid:3,vector:{107:5.0,105:4.5,104:4.0,101:2.5} userid:4,vector:{106:4.0,104:4.5,103:3.0,101:5.0} userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}
全局变量的文件:
WiKiUtils.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; public class WiKiUtils { public static final String PATH="hdfs://fansyonepc:9000/user/fansy/date1012/wikifirst/"; public static int RECOMMENDATIONSPERUSER=5; public static String JOB1OUTPATH=PATH+"job1/part-r-00000"; // this is used in WiKi5Reducer' function setup to get the items that the user already give a value }
WiKiDriver1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.VectorWritable; public class WiKiDriver1 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver1 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job one"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver1.class); job1.setMapperClass(WikiMapper1.class); job1.setMapOutputKeyClass(VarLongWritable.class); job1.setMapOutputValueClass(LongAndFloat.class); job1.setReducerClass(WiKiReducer1.class); job1.setOutputKeyClass(VarLongWritable.class); job1.setOutputValueClass(VectorWritable.class); FileInputFormat.addInputPath(job1, new Path("hdfs://fansyonepc:9000/user/fansy/input/"+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }
WiKiMapper1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable; public class WikiMapper1 extends Mapper<LongWritable ,Text,VarLongWritable,LongAndFloat>{ // private static final Pattern NUMBERS=Pattern.compile("(\\d+)"); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ VarLongWritable userID=new VarLongWritable(); LongWritable itemID=new LongWritable(); FloatWritable itemValue=new FloatWritable(); String line=value.toString(); String[] info=line.split(","); if(info.length!=3){ return; } userID.set(Long.parseLong(info[0])); itemID.set(Long.parseLong(info[1])); itemValue.set(Float.parseFloat(info[2])); context.write(userID, new LongAndFloat(itemID,itemValue)); } }
WiKiReducer1.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WiKiReducer1 extends Reducer<VarLongWritable,LongAndFloat,VarLongWritable,VectorWritable> { public void reduce(VarLongWritable userID,Iterable<LongAndFloat> itemPrefs,Context context) throws IOException, InterruptedException{ // RandomAccessSparseVector(int cardinality, int initialCapacity) Vector userVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10); for(LongAndFloat itemPref:itemPrefs){ userVector.set(Integer.parseInt(itemPref.getFirst().toString()),Float.parseFloat(itemPref.getSecond().toString()) ); } context.write(userID, new VectorWritable(userVector)); // System.out.println("userid:"+userID+",vector:"+userVector); } }
LongAndFloat.java: 用于存储数据并实现Writable的数据类型
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; public class LongAndFloat implements WritableComparable<LongAndFloat> { private LongWritable first; private FloatWritable second; public LongAndFloat(){ set(new LongWritable(),new FloatWritable()); } public LongAndFloat(LongWritable l,FloatWritable f){ set(l,f); } public void set(LongWritable longWritable, FloatWritable intWritable) { // TODO Auto-generated method stub this.first=longWritable; this.second=intWritable; } public LongWritable getFirst(){ return first; } public FloatWritable getSecond(){ return second; } @Override public void readFields(DataInput arg0) throws IOException { // TODO Auto-generated method stub first.readFields(arg0); second.readFields(arg0); } @Override public void write(DataOutput arg0) throws IOException { // TODO Auto-generated method stub first.write(arg0); second.write(arg0); } @Override public int compareTo(LongAndFloat o) { // TODO Auto-generated method stub int cmp=first.compareTo(o.first); if(cmp!=0){ return cmp; } return second.compareTo(o.second); } }
第二个MR:
输入数据为MR(1) 的输出 只是项目的相似度 先不管用户ID,直接对后面的所有项目进行拆分。
输出应该类似下面:
101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0} 102,{106:1.0,105:1.0,104:2.0,103:3.0,102:3.0,101:3.0}
WiKiDriver2.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.*; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.math.VectorWritable; public class WiKiDriver2 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver2 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job two"); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver2.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setMapperClass(WikiMapper2.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(IntWritable.class); job1.setReducerClass(WiKiReducer2.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorWritable.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }
WiKiMapper2.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WikiMapper2 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,IntWritable>{ public void map(VarLongWritable userID,VectorWritable userVector,Context context) throws IOException, InterruptedException{ Iterator<Vector.Element> it=userVector.get().iterateNonZero(); while(it.hasNext()){ int index1=it.next().index(); // System.out.println("index1:"+index1); Iterator<Vector.Element> it2=userVector.get().iterateNonZero(); while(it2.hasNext()){ int index2=it2.next().index(); // test /*if(index1==101){ System.out.println("index1:"+index1+",index2:"+index2); }*/ context.write(new IntWritable(index1), new IntWritable(index2)); } } } }
WiKiReducer2.java
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WiKiReducer2 extends Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> { public void reduce(IntWritable itemIndex1,Iterable<IntWritable> itemPrefs,Context context) throws IOException, InterruptedException{ // RandomAccessSparseVector(int cardinality, int initialCapacity) Vector itemVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10); for(IntWritable itemPref:itemPrefs){ int itemIndex2=itemPref.get(); itemVector.set(itemIndex2, itemVector.get(itemIndex2)+1.0); } context.write(itemIndex1, new VectorWritable(itemVector)); // System.out.println(itemIndex1+","+itemVector); } }
第三个MR:
含有两个Mapper,第一个MR(31)把MR(2)的输出的格式转为VectorOrPrefWritable;
MR(32)针对MR(1)的输出把每一个项目ID和用户ID作为一对进行输出,输出格式也为VectorOrPrefWritable;
WiKiDriver31.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiDriver31 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver31 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job three1"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver31.class); job1.setMapperClass(WikiMapper31.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(VectorOrPrefWritable.class); // set a reducer only to use SequenceFileOutputFormat job1.setReducerClass(WiKiReducer31.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorOrPrefWritable.class); // this MR's input is the MR2's output SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }
WiKiMapper31.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VectorWritable; public class WikiMapper31 extends Mapper<IntWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{ public void map(IntWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{ context.write(key, new VectorOrPrefWritable(value.get())); // System.out.println("key"+key.toString()+",vlaue"+value.get()); } }
WiKiReducer31.java
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiReducer31 extends Reducer<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> { public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{ for(VectorOrPrefWritable va:values){ context.write(key, va); } } }
WiKiDriver32.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; public class WiKiDriver32 { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf1 = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: WiKiDriver32 <in> <out>"); System.exit(2); } Job job1 = new Job(conf1, "wiki job one"); job1.setOutputFormatClass(SequenceFileOutputFormat.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setNumReduceTasks(1); job1.setJarByClass(WiKiDriver32.class); job1.setMapperClass(WikiMapper32.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(VectorOrPrefWritable.class); job1.setReducerClass(WiKiReducer32.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(VectorOrPrefWritable.class); // the WiKiDriver's out put is this one's input SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0])); SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1])); if(!job1.waitForCompletion(true)){ System.exit(1); // run error then exit } } }
WikiMapper32.java:
package org.fansy.date1012.mahoutinaction.chapter6.sourcecode; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class WikiMapper32 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{ public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{ long userID=key.get(); Vector userVector=value.get(); Iterator<Vector.Element> it=userVector.iterateNonZero(); IntWritable itemi=new IntWritable(); while(it.hasNext()){ Vector.Element e=it.next(); int itemIndex=e.index(); float preferenceValue=(float)e.get(); itemi.set(itemIndex); context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue)); // System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue); } // System.out.println(); } }
WiKiReducer32.java 其实和WiKiReducer31.java一模一样的,此处不再给出;
下接 Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 2
分享,快乐,成长