现在的位置: 首页 > 综合 > 正文

Hadoop Streaming 简介

2012年09月24日 ⁄ 综合 ⁄ 共 2883字 ⁄ 字号 评论关闭

  Hadoop是用Java实现的,但是我们也可以使用其他语言来编写MapReduce程序,比如Shell,Python,Ruby等,下面简单介绍一下Hadoop Streaming,并使用Python作为例子。

  1. Hadoop Streaming

  Hadoop Streaming的使用方式为:

1 hadoop jar hadoop-streaming.jar -D property=value -mapper mapper.py -combiner combiner.py -reducer reducer.py -input Input -output Output -file mapper.py -file reducer.py

  其中-mapper -combiner和-reducer指定Map Combine和Reduce程序.

  -input 和-output指定输入和输出文件夹 .

  -file可以将本地的mapper.py和reducer.py上传到所有的计算节点上(tasktracker node).  

  -D参数可以指定许多参数,经常用到的有:

property value 备注

mapred.reduce.tasks

  Int   Reduce个数,为0时没有Reduce  

mapred.job.queue.name

  String(Queue Name)     队列的名字

mapred.output.compress

  Boolean   是否对MapReduce的输出进行压缩  

mapred.output.compression.codec

  

org.apache.hadoop.io.compress.GzipCodec

  对MapReduce压缩的方式,此处是gzip  

stream.recordreader.compression

  gzip   是否读取压缩文件,gzip可以读取gz和bz2压缩文件  

mapred.job.reduce.memory.mb

  Int   reduce物理内存的大小

mapred.child.java.opts

  "-Xmx"+Int+"mm"   最大的Java堆栈的大小

  Hadoop Streaming的工作原理是:Mapper和Reducer(自动) 的从标准输入一行一行的读取数据(字符串),然后经过Mapper或者Reducer的逻辑处理后,把结果写入到标准输出,结果是以制表符(\t)分隔的Key-Value对。

  2. WordCount的Python Hadoop Streaming版本:

  Mapper.py

 1 import sys
 2 
 3 # input comes from STDIN (standard input)
 4 for line in sys.stdin:
 5     # remove leading and trailing whitespace
 6     line = line.strip()
 7     # split the line into words
 8     words = line.split()
 9     # increase counters
10     for word in words:
11         # write the results to STDOUT (standard output);
12         # what we output here will be the input for the
13         # Reduce step, i.e. the input for reducer.py
14         #
15         # tab-delimited; the trivial word count is 1
16         print '%s\t%s' % (word, 1)

   Mapper.py的工作原理很简单,从标准输入里读取数据(line 4),去除文本末尾的换行符(line 6),分词(line 8),对于每个单词输出一个(word,1)的Key-Value对(line 10-16).

Reducer.py

 1 import sys
 2 
 3 current_word = None
 4 current_count = 0
 5 word = None
 6 
 7 # input comes from STDIN
 8 for line in sys.stdin:
 9     # remove leading and trailing whitespace
10     line = line.strip()
11 
12     # parse the input we got from mapper.py
13     word, count = line.split('\t', 1)
14 
15     # convert count (currently a string) to int
16     try:
17         count = int(count)
18     except ValueError:
19         # count was not a number, so silently
20         # ignore/discard this line
21         continue
22 
23     # this IF-switch only works because Hadoop sorts map output
24     # by key (here: word) before it is passed to the reducer
25     if current_word == word:
26         current_count += count
27     else:
28         if current_word:
29             # write result to STDOUT
30             print '%s\t%s' % (current_word, current_count)
31         current_count = count
32         current_word = word
33 
34 # do not forget to output the last word if needed!
35 if current_word == word:
36     print '%s\t%s' % (current_word, current_count)

  Reducer.py略微复杂一些。首先还是去除换行符(line 10),分词(line 13)和类型检查(line 16-21)。然后,我们使用current_word来标示现在正在处理(亦即进行Reduce逻辑)的单词,如果遇到的新的单词和current_word一样,则进行累加(line 25-26),否则说明current_word的累加已经结束了,需要将结果写到标准输出(line 27-32)。line 28中之所以要进行判断是因为current_word的初始值是None,我们不需要输出以None为Key的Key-Value对。在最后,我们把最后一个Key-Value对输出(line 35-36)。

  3. 可以在本地测试Python Hadoop Streaming脚本的准确性:

1 cat file | python Mapper.py | sort -k1,1 | python Reducer.py

  其中sort -k1,1模拟的是Hadoop的Shuffle过程,但是因为是在本地跑,只有一台机器,所以以上测试过程和单机版的Hadoop差不多,和真正的分布式的Hadoop还是有所差距。

  参考文献:

  [1]. Hadoop Streaming

  [2]. 董的博客: Hadoop Streaming编程

  [3]. Writing A Hadoop MapReduce Program in Python

 

抱歉!评论已关闭.