hadoop入门级代码
一、hadoop中的hello,world
/*************************************************************************** * * Copyright (c) 2013 Baidu.com, Inc. All Rights Reserved * $Id$ * **************************************************************************/ /** * @file MaxTemperatureMapper.java * @author zhangliang08(zhangliang08@baidu.com) * @date 2013/04/25 21:11:57 * @version $Revision$ * @brief * **/ import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.hadoop.util.*; import org.apache.hadoop.conf.*; import java.util.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.fs.Path; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } /* vim: set ts=4 sw=4 sts=4 tw=100 */
利用ant工具打包,手动命令打包可能执行不了,mapreduce思想很简单,这里就不扯谈了,代码纯属copy
二、Hadoop实战中的例子:
1. 单表关联
单表关联这个例子是理解mapreduce的一个很好的例子
输入文件:
child parent
Tom Lucy
Lucy Mary
Tom Lucy
Lucy Mary
期望输出:
Tom Mary
代码如下:
package com.baidu.rawget; import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.io.*; import java.util.*; import org.apache.hadoop.util.*; import java.io.*; public class STjoin { public static int time = 0; public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private static IntWritable data = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String childname = new String(); String parentname = new String(); String relationtype = new String(); String line = value.toString(); int i = 0; while (line.charAt(i) != ' ') { i++; } String[] values = {line.substring(0, i), line.substring(i + 1)}; if (values[0].compareTo("child") != 0) { childname = values[0]; parentname = values[1]; relationtype = "1"; output.collect(new Text(values[1]), new Text(relationtype + "+" + childname + "+" + parentname)); relationtype = "2"; output.collect(new Text(values[0]), new Text(relationtype + "+" + childname + "+" + parentname)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { private static IntWritable linenum = new IntWritable(1); static int linenum2; public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { if (time == 0) { output.collect(new Text("grandchild"), new Text("grandparent")); time++; } int grandchildnum = 0; String grandchild[] = new String[10]; int grandparentnum = 0; String grandparent[] = new String[10]; while (values.hasNext()) { String record = values.next().toString(); System.err.println("record: " + record); int len = record.length(); int i = 2; if (len == 0) continue; char relationtype = record.charAt(0); String childname = new String(); String parentname = new String(); while (record.charAt(i) != '+') { childname = childname + record.charAt(i); i++; } i = i + 1; while (i < len) { parentname = parentname + record.charAt(i); i++; } if (relationtype == '1') { grandchild[grandchildnum] = childname; grandchildnum++; } else { grandparent[grandparentnum] = parentname; grandparentnum++; } } System.err.println("table 1:"); for (int m = 0; m < grandchildnum; m++) { System.err.println(grandchild[m]); } System.err.println("table 2:"); for (int m = 0; m < grandparentnum; m++) { System.err.println(grandparent[m]); } if (grandparentnum != 0 && grandchildnum != 0) { for (int m = 0; m < grandchildnum; m++) { for (int n = 0; n < grandparentnum; n++) { output.collect(new Text(grandchild[m]), new Text(grandparent[n])); } } } } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(STjoin.class); conf.setJobName("STjoin"); //conf.setNumReduceTasks(3); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
为了更好的理解这个代码,我将每次reduce的输入和grandchild、grandparent都打印出来~
record: 2+Lucy+Mary record: 1+Tom+Lucy table 1: Tom table 2: Mary record: 1+Lucy+Mary table 1: Lucy table 2: record: 2+Tom+Lucy table 1: table 2: Lucy
不难看出,相同的key被combine在一起了,一目了然
2. 排序
如何利用mapreduce来排序呢,书上给出了个例子,也很好理解,不过书上的代码有点小问题,修正后的代码如下:
package com.baidu.rawget; import java.io.IOException; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.io.*; import java.util.*; import org.apache.hadoop.util.*; import java.io.*; public class Sort { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> { private static IntWritable data = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); data.set(Integer.parseInt(line)); output.collect(data, new IntWritable(1)); } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private static IntWritable linenum = new IntWritable(1); static int linenum2; public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { System.err.println("Reduce"); while (values.hasNext()) { System.err.println("key = "); System.err.println(key); output.collect(linenum, key); linenum = new IntWritable(linenum.get() + 1); linenum2 = values.next().get(); } } } public static class Partition implements Partitioner<IntWritable, IntWritable> { public void configure(JobConf job) {} public int getPartition(IntWritable key, IntWritable value, int numPartitions) { System.err.println("numPartitions = "); System.err.println(numPartitions); int Maxnumber = 65223; int bound = Maxnumber / numPartitions + 1; int keynumber = key.get(); for (int i = 1; i <= numPartitions; i++) { if (keynumber < bound * i && keynumber >= bound * (i - 1)) { return i - 1; } } return -1; } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(Sort.class); conf.setJobName("Sort"); conf.setNumReduceTasks(3); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setPartitionerClass(Partition.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
三、MapReduce进阶:
为了深入理解mapreduce思想,矩阵相乘以及kmeans等算法,以此加深对mapreduce算法的深入理解
1. 矩阵相乘: