现在的位置: 首页 > 综合 > 正文

hadoop入门级代码

2013年01月21日 ⁄ 综合 ⁄ 共 7315字 ⁄ 字号 评论关闭

hadoop入门级代码

一、hadoop中的hello,world

/***************************************************************************
 * 
 * Copyright (c) 2013 Baidu.com, Inc. All Rights Reserved
 * $Id$ 
 * 
 **************************************************************************/
 
 
 
/**
 * @file MaxTemperatureMapper.java
 * @author zhangliang08(zhangliang08@baidu.com)
 * @date 2013/04/25 21:11:57
 * @version $Revision$ 
 * @brief 
 *  
 **/

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import java.util.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;

public class WordCount {
	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) 
				throws IOException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				output.collect(word, one);
			}
		}
	}

	public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
				Reporter reporter) 
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(WordCount.class);
		conf.setJobName("wordcount");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setMapperClass(Map.class);
		conf.setReducerClass(Reduce.class);
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		JobClient.runJob(conf);
	}
}
/* vim: set ts=4 sw=4 sts=4 tw=100 */

利用ant工具打包,手动命令打包可能执行不了,mapreduce思想很简单,这里就不扯谈了,代码纯属copy

二、Hadoop实战中的例子:

1. 单表关联
单表关联这个例子是理解mapreduce的一个很好的例子
输入文件:
child parent
Tom Lucy
Lucy Mary
期望输出:
Tom Mary

代码如下:
package com.baidu.rawget;
import java.io.IOException;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import java.util.*;
import org.apache.hadoop.util.*;

import java.io.*;

public class STjoin {
	public static int time = 0;
	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
		private static IntWritable data = new IntWritable();
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
			String childname = new String();
			String parentname = new String();
			String relationtype = new String();
			String line = value.toString();
			int i = 0;
			while (line.charAt(i) != ' ') {
				i++;
			}
			String[] values = {line.substring(0, i), line.substring(i + 1)};
			if (values[0].compareTo("child") != 0) {
				childname = values[0];
				parentname = values[1];
				relationtype = "1";
				output.collect(new Text(values[1]), new Text(relationtype + "+" + childname + "+" + parentname));
				relationtype = "2";
				output.collect(new Text(values[0]), new Text(relationtype + "+" + childname + "+" + parentname));
			}
		}   
	}   

	public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
		private static IntWritable linenum = new IntWritable(1);
		static int linenum2;
		public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
			if (time == 0) {
				output.collect(new Text("grandchild"), new Text("grandparent"));
				time++;
			}
			
			int grandchildnum = 0;
			String grandchild[] = new String[10];
			int grandparentnum = 0;
			String grandparent[] = new String[10];
			while (values.hasNext()) {
				String record = values.next().toString();
				System.err.println("record: " + record);
				int len = record.length();
				int i = 2;
				if (len == 0)
					continue;
				char relationtype = record.charAt(0);
				String childname = new String();
				String parentname = new String();
				while (record.charAt(i) != '+') {
					childname = childname + record.charAt(i);
					i++;
				}
				i = i + 1;
				while (i < len) {
					parentname = parentname + record.charAt(i);
					i++;
				}
				
				if (relationtype == '1') {
					grandchild[grandchildnum] = childname;
					grandchildnum++;
				}
				else {
					grandparent[grandparentnum] = parentname;
					grandparentnum++;
				}
			}

			System.err.println("table 1:");
			for (int m = 0; m < grandchildnum; m++) {
				System.err.println(grandchild[m]);
			}

			System.err.println("table 2:");
			for (int m = 0; m < grandparentnum; m++) {
				System.err.println(grandparent[m]);
			}

			if (grandparentnum != 0 && grandchildnum != 0) {
				for (int m = 0; m < grandchildnum; m++) {
					for (int n = 0; n < grandparentnum; n++) {
						output.collect(new Text(grandchild[m]), new Text(grandparent[n]));
					}
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(STjoin.class);
		conf.setJobName("STjoin");
		//conf.setNumReduceTasks(3);

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);
		conf.setMapperClass(Map.class);
		conf.setReducerClass(Reduce.class);
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		JobClient.runJob(conf);
	}
}

为了更好的理解这个代码,我将每次reduce的输入和grandchild、grandparent都打印出来~

record: 2+Lucy+Mary
record: 1+Tom+Lucy
table 1:
Tom
table 2:
Mary
record: 1+Lucy+Mary
table 1:
Lucy
table 2:
record: 2+Tom+Lucy
table 1:
table 2:
Lucy

不难看出,相同的key被combine在一起了,一目了然

2. 排序
如何利用mapreduce来排序呢,书上给出了个例子,也很好理解,不过书上的代码有点小问题,修正后的代码如下:
package com.baidu.rawget;
import java.io.IOException;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import java.util.*;
import org.apache.hadoop.util.*;
import java.io.*;

public class Sort
{
	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
		private static IntWritable data = new IntWritable();
		public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
			String line = value.toString();
			data.set(Integer.parseInt(line));
			output.collect(data, new IntWritable(1));
		}   
	}   

	public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
		private static IntWritable linenum = new IntWritable(1);
		static int linenum2;
		public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
			System.err.println("Reduce");
			while (values.hasNext()) {
				System.err.println("key = ");
				System.err.println(key);
				output.collect(linenum, key);
				linenum = new IntWritable(linenum.get() + 1);
				linenum2 = values.next().get();
			}
		}   
	}

	public static class Partition implements Partitioner<IntWritable, IntWritable> {
		public void configure(JobConf job) {}

		public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
			System.err.println("numPartitions = ");
			System.err.println(numPartitions);
			int Maxnumber = 65223;
			int bound = Maxnumber / numPartitions + 1;
			int keynumber = key.get();
			for (int i = 1; i <= numPartitions; i++) {
				if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
					return i - 1;
				}
			}

			return -1;
		}
	}

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(Sort.class);
		conf.setJobName("Sort");
		conf.setNumReduceTasks(3);

		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setMapperClass(Map.class);
		conf.setReducerClass(Reduce.class);
		conf.setPartitionerClass(Partition.class);
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		JobClient.runJob(conf);
	}
}

三、MapReduce进阶:

为了深入理解mapreduce思想,矩阵相乘以及kmeans等算法,以此加深对mapreduce算法的深入理解
1. 矩阵相乘:

抱歉!评论已关闭.