现在的位置: 首页 > 综合 > 正文

Hadoop MapReduce之ReduceTask任务执行(一):远程拷贝map输出

2014年09月05日 ⁄ 综合 ⁄ 共 12181字 ⁄ 字号 评论关闭

reduce执行流程经历三个阶段:copy、sort、reduce,在第一阶段reduce任务会把map的输出拷贝至本地,通过线程MapOutputCopier,该线程通过http协议将map输出拷贝至本地,该copy操作可以并行进行,默认情况下有5个线程执行此操作,如果map数量较大时可以适当调大此值,拷贝时使用http协议,此时reducetask为client,map端以jetty作为web服务器。reduce任务的执行与map一样在Child类启动,但在TaskFinal.run(job,umbilical)进入ReduceTask类执行。reduce的过程比较复杂,本节只分析copy部分,最后会分析整个reduce流程,需要注意的是每个reduce只拷贝自己需要处理那个partition数据。


拷贝map输出结果代码:ReduceTask.java 1309行

下面分析copyOutput函数:ReduceTask.java 1373行

getMapOutput函数负责拷贝输出的工作,利用URLConnection建立连接,url格式类似:http://PC-20130917RGUY:50060/mapOutput?job=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1
,包含协议类型:http,主机及端口:PC-20130917RGUY:50060,路径名称:mapOutput,查询参数包含作业名、map任务名、reduce编号:ob=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1

url会根据这个地址建立连接,并打开一个输入流读取数据。在开始读取前会判断本次的读取是否能全部放入缓存中,这部分缓存使用是有限制的:jvm_heap_size × mapred.job.shuffle.input.buffer.percent × MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION,其中jvm_heap_size可以通过mapred.job.reduce.total.mem.bytes来设置,如果没设置则通过Runtime.getRuntime().maxMemory()来获取,可以通过mapred.child.opts来影响jvm堆的大小,mapred.job.shuffle.input.buffer.percent可以在参数文件中设置,默认为0.7,MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION在当前版本中为一常量值0.25,也就是说加入我们指定jvm堆大小为1024M,那么一个ReduceTask在拷贝时用到的缓存为1024×0.7×0.25=179M,当我们的map输出大于179M时,则直接写入文件.

ReduceTask.java 1373行

如果内存足够大,则copy过来的数据直接放入内存中,首先会分配一个byte数组,然后从上面建立的输入流冲取得所需数据。

ReduceTask.java 1646行

如果内存过小不能存放本次读取的数据则直接写入磁盘文件中,我们会在相关目录中看到这个文件如:C:/hadoop/tasklog/taskTracker/hadoop/jobcache/job_201311281345_0001/attempt_201311281345_0001_r_000001_1/output/map_0.out-0

阅读笔记:

1. MapOutput类 本质上是一个指向一个数据块的指针,该数据块可以在硬盘上,也可以在内存上。(1)final boolean inMemory表示该数据块是否在内存中 (2)final Path file表示数据在硬盘上的路径 (3)byte[] data表示数据在内存中的数据块

2. ReduceTask.run() 是ReduceTask的起始点。其中分为三部分:(1).Copy阶段(由reduceCopier.fetchOutputs()完成
(2).Sort阶段(由Merger.merge()完成) (3).Reduce阶段(由runOldReducer()或者runNewReducer()完成

3. fetchOutputs()函数中启动多个(由mapred.reduce.parallel.copies属性设置,默认为5个)MapOutputCopier线程进行远程数据拷贝到本地。远程拷贝运行过程中,存在 InMemFSMergeThread线程 和 LocalFSMerger线程 进行文件合并。

4. MapOutput类中的 discard()函数即抛弃拷贝的map输出结果。若该MapOutput数据块在内存上,则将数据指针data置null,若数据块在硬盘上,则调用 fs.delete(file,true) 删除该文件。

5. 远程拷贝过程中,每次拷贝一个数据块时,若该数据块可以放入内存则放入内存,否则放入硬盘。有两个标准决定该数据块是否应该放入硬盘:(1) 数据块小于 java_heaps _size * mapred.job.shuffle.input.buffer.percent * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION(0.25) (2) 内存中有足够空间放入该数据块。

抱歉!评论已关闭.