MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(化简)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。他极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(化简)函数,用来保证所有映射的键值对中的每一个共享相同的键组,下面我将用一个小例子,作为学习的总结,不过我有一个疑问:主线程为什么要停止1s呢,希望大家可以给我解答疑问
新建MyMapReduce.java
package mapReduce; import java.util.*; public class MyMapReduce { List buckets = new ArrayList(); List intermediateresults = new ArrayList(); List values = new ArrayList(); public void init() { // init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理 for (int i = 1; i <= 30; i++) { values.add("javaxp:" + new Integer(i).toString()); } System.out.println("拆分成不同的Buckets"); List b = step1ConvertIntoBuckets(values, 5); System.out.println("Map Function:同时处理所有的Buckets"); List res = step2RunMapFunctionForAllBuckets(b); System.out.println("*Reduce Function:收集中间的结果,并打印输出"); step3RunReduceFunctionForAllBuckets(res); } public List step1ConvertIntoBuckets(List list, int numberofbuckets) // 将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点 { int n = list.size(); int m = n / numberofbuckets; int rem = n % numberofbuckets; int count = 0; System.out.println("BUCKETS"); for (int j = 1; j <= numberofbuckets; j++) { List temp = new ArrayList(); for (int i = 1; i <= m; i++) { temp.add((String) values.get(count)); count++; } buckets.add(temp); temp = new ArrayList(); } if (rem != 0) { List temp = new ArrayList(); for (int i = 1; i <= rem; i++) { temp.add((String) values.get(count)); count++; } buckets.add(temp); } System.out.println(buckets); return buckets; } public List step2RunMapFunctionForAllBuckets(List list) { //创建了5个线程(每个bucket一个),每个线程StartThread处理每个bucket并把处理结果放在intermediateresults这个arraylist中 for (int i = 0; i < list.size(); i++) { List elementList = (ArrayList) list.get(i); new StartThread(elementList).start(); } try { Thread.currentThread().sleep(1000); //为什么要停止 1s呢? } catch (Exception e) { } return intermediateresults; } public void step3RunReduceFunctionForAllBuckets(List list) { // 加载intermediateresults中间处理结果,并进行汇总处理,最后得到最终的计算结果,如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算 int sum = 0; for (int i = 0; i < list.size(); i++) { // you can do some processing here, like finding max of all results // etc int t = Integer.parseInt((String) list.get(i)); sum += t; } System.out.println("Total Count is " + sum); } class StartThread extends Thread { //分进程处理拆分下来的list private List tempList = new ArrayList(); public StartThread(List list) { tempList = list; } public void run() { for (int i = 0; i < tempList.size(); i++) { String str = (String) tempList.get(i); synchronized (this) { intermediateresults.add(new Integer(str.length()).toString()); } } } } }
另建一个调用类:Main.java
package mapReduce; /* * 参考资料:http://pconline900.iteye.com/blog/246680 */ public class Main { public static void main(String[] args) { MyMapReduce my = new MyMapReduce(); my.init(); } }