现在的位置: 首页 > 综合 > 正文

hadoop初学之MapReduce编程模型学习

2017年08月22日 ⁄ 综合 ⁄ 共 2617字 ⁄ 字号 评论关闭

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)""Reduce(化简)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。他极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(化简)函数,用来保证所有映射的键值对中的每一个共享相同的键组,下面我将用一个小例子,作为学习的总结,不过我有一个疑问:主线程为什么要停止1s呢,希望大家可以给我解答疑问

新建MyMapReduce.java

package mapReduce;

import java.util.*;

public class MyMapReduce {
	List buckets = new ArrayList();
	List intermediateresults = new ArrayList();
	List values = new ArrayList();

	public void init() {      //	init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理
		for (int i = 1; i <= 30; i++) {
			values.add("javaxp:" + new Integer(i).toString());
		}

		System.out.println("拆分成不同的Buckets");
		List b = step1ConvertIntoBuckets(values, 5);

		System.out.println("Map Function:同时处理所有的Buckets");
		List res = step2RunMapFunctionForAllBuckets(b);

		System.out.println("*Reduce Function:收集中间的结果,并打印输出");
		
		step3RunReduceFunctionForAllBuckets(res);

	}

	public List step1ConvertIntoBuckets(List list, int numberofbuckets) // 将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点	
	{
		int n = list.size();
		int m = n / numberofbuckets;
		int rem = n % numberofbuckets;

		int count = 0;
		System.out.println("BUCKETS");
		for (int j = 1; j <= numberofbuckets; j++) {
			List temp = new ArrayList();
			for (int i = 1; i <= m; i++) {

				temp.add((String) values.get(count));
				count++;

			}
			buckets.add(temp);
			temp = new ArrayList();
		}
		if (rem != 0) {
			List temp = new ArrayList();
			for (int i = 1; i <= rem; i++) {

				temp.add((String) values.get(count));
				count++;
			}
			buckets.add(temp);
		}
		System.out.println(buckets);
		return buckets;

	}

	public List step2RunMapFunctionForAllBuckets(List list) {  //创建了5个线程(每个bucket一个),每个线程StartThread处理每个bucket并把处理结果放在intermediateresults这个arraylist中
		for (int i = 0; i < list.size(); i++) {
			List elementList = (ArrayList) list.get(i);
			new StartThread(elementList).start();
		}

		try {
			Thread.currentThread().sleep(1000);       //为什么要停止 1s呢?
		} catch (Exception e) {
		}
		return intermediateresults;
	}

	public void step3RunReduceFunctionForAllBuckets(List list) {
// 加载intermediateresults中间处理结果,并进行汇总处理,最后得到最终的计算结果,如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算
		int sum = 0;
		for (int i = 0; i < list.size(); i++) {
			// you can do some processing here, like finding max of all results
			// etc
			int t = Integer.parseInt((String) list.get(i));
			sum += t;
		}

		System.out.println("Total Count is " + sum);
	}

	class StartThread extends Thread {                       //分进程处理拆分下来的list
		private List tempList = new ArrayList();

		public StartThread(List list) {
			tempList = list;
		}
		public void run() {
			for (int i = 0; i < tempList.size(); i++) {
				String str = (String) tempList.get(i);
				synchronized (this) {
					intermediateresults.add(new Integer(str.length()).toString());
				}

			}
		}

	}

}

另建一个调用类:Main.java

package mapReduce;
/*
 * 参考资料:http://pconline900.iteye.com/blog/246680
 */
public class Main 
{ 

    public static void main(String[] args) 
    { 

        MyMapReduce my = new MyMapReduce(); 
        my.init(); 

    } 
} 

抱歉!评论已关闭.