Streaming是Hadoop提供的一个可以使用其他编程语言来进行MapReduce来的API,用户可以使用任何可以使用标准输入输出的语言编写mapreduce。特点是开发简便,map和reduce甚至可以使用shell命令实现。
例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D mapred.map.tasks=1 \ -D mapred.reduce.tasks=1 \ -D mapred.job.name="Experiment" \ -input "input.txt" \ -output "out" \ -mapper "xargs cat" \ -reducer "cat"
以下是我实现的优质微博筛选排重mapreduce的实现
map:
#include <iostream> #include <stdio.h> #include <string> using namespace std; int main(int argc, char *argv[]){ string docid, dup; while(cin >> docid >> dup){ cout << dup << "\t" << docid << endl; } return 0; }
reduce:
#include <iostream> #include <stdint.h> using namespace std; int main(int argc, char *argv[]){ string key; uint64_t value; string last_key = ""; uint64_t last_value = 0; while(cin >> key >> value){ if(key == "*"){ cout << value << "\t" << "A" << endl; }else if(last_key != "" && last_key != key){ cout << last_value << "\t" << "A" << endl; last_key = key; last_value = value; }else{ last_key = key; last_value = value; } } if(last_key != ""){ cout << value << "\t" << "A" << endl; } }
执行命令:
cmd="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar" cmd=$cmd" -jt host:port" cmd=$cmd" -fs host:port" cmd=$cmd" -input $1" cmd=$cmd" -output $2" cmd=$cmd" -file uniq_map" cmd=$cmd" -file uniq_reduce" cmd=$cmd" -mapper uniq_map" cmd=$cmd" -reducer uniq_reduce" cmd=$cmd" -jobconf mapred.map.tasks.spaculative.execution=false" cmd=$cmd" -jobconf mapred.task.timeout=60000000" cmd=$cmd" -jobconf mapred.job.tracker=local" echo $cmd $cmd >> ./stat.log
其实通过脚本语言实现比C++更为容易。
注意:
-jt和-fs这种通用选项必须写在最前面(Note: Be sure to place the generic options before the streaming options, otherwise the command will fail)
-jobconf mapred.job.tracker=local 指定任务在本地执行方便用户进行调试,input和output都使用hdfs的文件
hadoop stream的调试非常简单且强大,以下是几种调试方式:
1、直接在本地进行调试,例如 cat example.txt | ./uniq_map | sort | uniq_reduce 就可以实现以上程序的调试
2、让HadoopStreaming程序跑在开发机上。(推荐在开发时使用)在jobconf中加上mapred.job.tracker=local。数据的输入和输出都是来自HDFS此时,HadoopStreaming会在本地运行程序
3、保留出错的现场(推荐在跑大数据量时使用)通过设置jobconf参数keep.failed.task.files=true,当程序出错时,可以保留现以供Debug。可以通过GUI查到到具体是在哪个节点运行失败,然后登陆到该节点<local>/taskTracker/<taskid>/work/ ,查看core文件。
4、通过script程序收集信息来调试程序(推荐在开发时使用)编写调试脚本程序,通过脚本,可以把程序执行过程中任何现场都保留下来,比如 Core文件的堆栈信息,这样可以确定程序具体是在什么地方出错。
脚本示例:
core=`find . -name 'core*'`; cp $core /home/admin/ gdb -quiet ./a.out -c $core -x ./pipes-default-gdb-commands.txt pipes-default-gdb-commands.txt注明了执行的gdb命令 info threads backtrace quit
(注明:如果要正确执行以上的脚本,必须让程序能输出core文件,可以在程序中加入如下代码段)
struct rlimit limit; limit.rlim_cur = 65535; limit.rlim_max = 65535; if (setrlimit(RLIMIT_CORE, &limit) != 0) { printf("setrlimit() failed with errno=%s\n", strerror(errno)); exit(1); }
然后在jobconf中,把要执行的script赋给变量”mapred.map.task.debug.script”或”mapred.reduce.task.debug.script”。 这样当HadoopStreaming执行过程发生core dump,就可以通过JobTracker的GUI界面看到GDB的信息了。