现在的位置: 首页 > 综合 > 正文

Hadoop 实现协同过滤 (example in chapter 6) Part 1

2013年03月02日 ⁄ 综合 ⁄ 共 15545字 ⁄ 字号 评论关闭

最近一直在研究《Mahout in Action》,今天才算是把第一部分看完。在Chapter 6中有一个例子,是实现协同过滤进行推荐的例子,不过书上的是针对布尔值的输入数据,在mahout的安装目录里面也有这个算法的详细源码,但是毕竟是源码,读起来有点晦涩,所以就参考了书上的例子编写了(书上的例子思路比较清楚)不仅仅是布尔值的输入数据的代码;

下面就详细说下思路及代码:

输入数据:

第一列代表用户名ID,后面是项目ID,用逗号分隔
1,101,5.0 
1,102,3.0 
1,103,2.5 
2,101,2.0 
2,102,2.5 
2,103,5.0 
2,104,2.0 
3,101,2.5 
3,104,4.0 
3,105,4.5 
3,107,5.0 
4,101,5.0 
4,103,3.0 
4,104,4.5 
4,106,4.0 
5,101,4.0 
5,102,3.0 
5,103,2.0 
5,104,4.0 
5,105,3.5 
5,106,4.0 

第一个MR 就是把输入数据的每个用户的信息整合下:

如下:

userid:1,vector:{103:2.5,102:3.0,101:5.0}
userid:2,vector:{104:2.0,103:5.0,102:2.5,101:2.0}
userid:3,vector:{107:5.0,105:4.5,104:4.0,101:2.5}
userid:4,vector:{106:4.0,104:4.5,103:3.0,101:5.0}
userid:5,vector:{106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}

全局变量的文件:

WiKiUtils.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

public class WiKiUtils {
		
		public static final String PATH="hdfs://fansyonepc:9000/user/fansy/date1012/wikifirst/";
		
		public static int RECOMMENDATIONSPERUSER=5;
		
		public static String JOB1OUTPATH=PATH+"job1/part-r-00000";  // this is used in WiKi5Reducer' function setup to get the items that the user already give a value
}

WiKiDriver1.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;


public class WiKiDriver1 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
		
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: WiKiDriver1 <in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job one");
	    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver1.class);
	    job1.setMapperClass(WikiMapper1.class);
	    job1.setMapOutputKeyClass(VarLongWritable.class);
		job1.setMapOutputValueClass(LongAndFloat.class);
	    job1.setReducerClass(WiKiReducer1.class);
	    job1.setOutputKeyClass(VarLongWritable.class);
	    job1.setOutputValueClass(VectorWritable.class);
	    
	    FileInputFormat.addInputPath(job1, new Path("hdfs://fansyonepc:9000/user/fansy/input/"+otherArgs[0]));
	    SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}

}

WiKiMapper1.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;

public class WikiMapper1 extends Mapper<LongWritable ,Text,VarLongWritable,LongAndFloat>{

//	private static final Pattern NUMBERS=Pattern.compile("(\\d+)");
	
	public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
		
		VarLongWritable userID=new VarLongWritable();
		LongWritable itemID=new LongWritable();
		FloatWritable itemValue=new FloatWritable();
		String line=value.toString();
		String[] info=line.split(",");
		if(info.length!=3){
			return;
		}
		userID.set(Long.parseLong(info[0]));
		itemID.set(Long.parseLong(info[1]));
		itemValue.set(Float.parseFloat(info[2]));
		context.write(userID, new LongAndFloat(itemID,itemValue));
		
	}
}

WiKiReducer1.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WiKiReducer1 extends Reducer<VarLongWritable,LongAndFloat,VarLongWritable,VectorWritable> {
	
		public void reduce(VarLongWritable userID,Iterable<LongAndFloat> itemPrefs,Context context) throws IOException, InterruptedException{
			// RandomAccessSparseVector(int cardinality, int initialCapacity) 
			Vector userVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
			for(LongAndFloat itemPref:itemPrefs){
				userVector.set(Integer.parseInt(itemPref.getFirst().toString()),Float.parseFloat(itemPref.getSecond().toString()) );
			}
			context.write(userID, new VectorWritable(userVector));
	//		System.out.println("userid:"+userID+",vector:"+userVector);
		}
}

LongAndFloat.java:  用于存储数据并实现Writable的数据类型

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;

public class LongAndFloat implements WritableComparable<LongAndFloat> {
	private LongWritable first;
	private FloatWritable second;
	public LongAndFloat(){
		set(new LongWritable(),new FloatWritable());
	}
	public LongAndFloat(LongWritable l,FloatWritable f){
		set(l,f);
	}
	public  void set(LongWritable longWritable, FloatWritable intWritable) {
		// TODO Auto-generated method stub
		this.first=longWritable;
		this.second=intWritable;
	}
	public LongWritable getFirst(){
		return first;
	}
	public FloatWritable getSecond(){
		return second;
	}
	@Override
	public void readFields(DataInput arg0) throws IOException {
		// TODO Auto-generated method stub
		first.readFields(arg0);
		second.readFields(arg0);
	}
	@Override
	public void write(DataOutput arg0) throws IOException {
		// TODO Auto-generated method stub
		first.write(arg0);
		second.write(arg0);
	}
	@Override
	public int compareTo(LongAndFloat o) {
		// TODO Auto-generated method stub
		int cmp=first.compareTo(o.first);
		if(cmp!=0){
			return cmp;
		}
		return second.compareTo(o.second);
	}	
}

第二个MR:

输入数据为MR(1) 的输出            只是项目的相似度
先不管用户ID,直接对后面的所有项目进行拆分。

输出应该类似下面:

101,{107:1.0,106:2.0,105:2.0,104:4.0,103:4.0,102:3.0,101:5.0}
102,{106:1.0,105:1.0,104:2.0,103:3.0,102:3.0,101:3.0}

WiKiDriver2.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.*;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.math.VectorWritable;


public class WiKiDriver2 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: WiKiDriver2 <in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job two");
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver2.class);
	    job1.setInputFormatClass(SequenceFileInputFormat.class);
	    job1.setMapperClass(WikiMapper2.class);
	    job1.setMapOutputKeyClass(IntWritable.class);
		job1.setMapOutputValueClass(IntWritable.class);
	    job1.setReducerClass(WiKiReducer2.class);
	    job1.setOutputKeyClass(IntWritable.class);
	    job1.setOutputValueClass(VectorWritable.class);
	    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
	    SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}

}

WiKiMapper2.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WikiMapper2 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,IntWritable>{
	
	public void map(VarLongWritable userID,VectorWritable userVector,Context context) throws IOException, InterruptedException{
		Iterator<Vector.Element> it=userVector.get().iterateNonZero();
		while(it.hasNext()){
			int index1=it.next().index();
	//		System.out.println("index1:"+index1);
			Iterator<Vector.Element> it2=userVector.get().iterateNonZero();
			while(it2.hasNext()){
				int index2=it2.next().index();
				
				//  test
				/*if(index1==101){
					System.out.println("index1:"+index1+",index2:"+index2);
				}*/
				context.write(new IntWritable(index1), new IntWritable(index2));
			}
		}
	}
}

WiKiReducer2.java

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WiKiReducer2 extends Reducer<IntWritable,IntWritable,IntWritable,VectorWritable> {
	
		public void reduce(IntWritable itemIndex1,Iterable<IntWritable> itemPrefs,Context context) throws IOException, InterruptedException{
			// RandomAccessSparseVector(int cardinality, int initialCapacity) 
			Vector itemVector=new RandomAccessSparseVector(Integer.MAX_VALUE,10);
			for(IntWritable itemPref:itemPrefs){
				int itemIndex2=itemPref.get();
				itemVector.set(itemIndex2, itemVector.get(itemIndex2)+1.0);
			}
			context.write(itemIndex1, new VectorWritable(itemVector));
	//		System.out.println(itemIndex1+","+itemVector);
		}
}

第三个MR:

含有两个Mapper,第一个MR(31)MR(2)的输出的格式转为VectorOrPrefWritable;
MR(32)针对MR(1)的输出把每一个项目ID和用户ID作为一对进行输出,输出格式也为VectorOrPrefWritable;

WiKiDriver31.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;


public class WiKiDriver31 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
		
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: WiKiDriver31 <in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job three1");
	    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    job1.setInputFormatClass(SequenceFileInputFormat.class);
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver31.class);
	    job1.setMapperClass(WikiMapper31.class);
	    job1.setMapOutputKeyClass(IntWritable.class);
		job1.setMapOutputValueClass(VectorOrPrefWritable.class);
		
		// set a reducer only to use SequenceFileOutputFormat
	    job1.setReducerClass(WiKiReducer31.class);
	    job1.setOutputKeyClass(IntWritable.class);
	    job1.setOutputValueClass(VectorOrPrefWritable.class);
	    
	    // this MR's input is the MR2's output
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
	    SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}

}

WiKiMapper31.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VectorWritable;

public class WikiMapper31 extends Mapper<IntWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{
	
	public void map(IntWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
		
				context.write(key, new VectorOrPrefWritable(value.get()));
		//		System.out.println("key"+key.toString()+",vlaue"+value.get());
			}
	}

WiKiReducer31.java

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;

public class WiKiReducer31 extends Reducer<IntWritable ,VectorOrPrefWritable,IntWritable,VectorOrPrefWritable> {
	public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
		
		for(VectorOrPrefWritable va:values){
			context.write(key, va);
		}
	}

}

WiKiDriver32.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import static org.fansy.date1012.mahoutinaction.chapter6.sourcecode.WiKiUtils.PATH;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;


public class WiKiDriver32 {

	/**
	 * @param args
	 * @throws IOException 
	 * @throws InterruptedException 
	 * @throws ClassNotFoundException 
	 */
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf1 = new Configuration();
		
	    String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();      
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: WiKiDriver32 <in> <out>");
	      System.exit(2);
	    }
	    Job job1 = new Job(conf1, "wiki  job one");
	    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
	    job1.setInputFormatClass(SequenceFileInputFormat.class);
	    job1.setNumReduceTasks(1);
	    job1.setJarByClass(WiKiDriver32.class);
	    job1.setMapperClass(WikiMapper32.class);
	    job1.setMapOutputKeyClass(IntWritable.class);
		job1.setMapOutputValueClass(VectorOrPrefWritable.class);

		job1.setReducerClass(WiKiReducer32.class);
	    job1.setOutputKeyClass(IntWritable.class);
	    job1.setOutputValueClass(VectorOrPrefWritable.class);
	    
	    // the WiKiDriver's out put is this one's input
	    SequenceFileInputFormat.addInputPath(job1, new Path(PATH+otherArgs[0]));
	    SequenceFileOutputFormat.setOutputPath(job1, new Path(PATH+otherArgs[1]));   
	    if(!job1.waitForCompletion(true)){
	    	System.exit(1); // run error then exit
	    }
	}

}

WikiMapper32.java:

package org.fansy.date1012.mahoutinaction.chapter6.sourcecode;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class WikiMapper32 extends Mapper<VarLongWritable ,VectorWritable,IntWritable,VectorOrPrefWritable>{
	
	public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
		
			long userID=key.get();
			Vector userVector=value.get();
			Iterator<Vector.Element> it=userVector.iterateNonZero();
			IntWritable itemi=new IntWritable();
			while(it.hasNext()){
				Vector.Element e=it.next();
				int itemIndex=e.index();
				float preferenceValue=(float)e.get();
				itemi.set(itemIndex);
				context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue));
		//		System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue);
			}
		//	System.out.println();
		
	}
}

WiKiReducer32.java 其实和WiKiReducer31.java一模一样的,此处不再给出;

下接  Hadoop 实现协同过滤 (example in <Mahout in action> chapter 6) Part 2

分享,快乐,成长

抱歉!评论已关闭.