reduce执行流程经历三个阶段:copy、sort、reduce,在第一阶段reduce任务会把map的输出拷贝至本地,通过线程MapOutputCopier,该线程通过http协议将map输出拷贝至本地,该copy操作可以并行进行,默认情况下有5个线程执行此操作,如果map数量较大时可以适当调大此值,拷贝时使用http协议,此时reducetask为client,map端以jetty作为web服务器。reduce任务的执行与map一样在Child类启动,但在TaskFinal.run(job,umbilical)进入ReduceTask类执行。reduce的过程比较复杂,本节只分析copy部分,最后会分析整个reduce流程,需要注意的是每个reduce只拷贝自己需要处理那个partition数据。
拷贝map输出结果代码:ReduceTask.java 1309行
下面分析copyOutput函数:ReduceTask.java 1373行
// 内存写满时需要用到的临时文件
TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename =
new Path(String.format(
MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
TaskTracker.OUTPUT, loc.getTaskId().getId()));
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
// 开始拷贝map的输出
MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
reduceId.getTaskID().getId());
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
loc.getHost());
}
// 获得输出尺寸
long bytes = mapOutput.compressedSize;
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
if (copiedMapOutputs.contains(loc.getTaskId())) {
mapOutput.discard();
return CopyResult.OBSOLETE;
}
// Special case: discard empty map-outputs
if (bytes == 0) {
try {
mapOutput.discard();
} catch (IOException ioe) {
LOG.info("Couldn't discard output of " + loc.getTaskId());
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
return bytes;
}
// 判断是否完全在内存中,根据具体情况执行不同分支
if (mapOutput.inMemory) {
// 如果完全在内存中则放入内存文件的集合中
mapOutputsFilesInMemory.add(mapOutput);
} else {
// Rename the temporary file to the final file;
// ensure it is on the same partition
tmpMapOutput = mapOutput.file;
filename = new Path(tmpMapOutput.getParent(), filename.getName());
if (!localFileSys.rename(tmpMapOutput, filename)) {
localFileSys.delete(tmpMapOutput, true);
bytes = -1;
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
}
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
}
}
// Note that we successfully copied the map-output
noteCopiedMapOutput(loc.getTaskId());
}
return bytes;
}
getMapOutput函数负责拷贝输出的工作,利用URLConnection建立连接,url格式类似:http://PC-20130917RGUY:50060/mapOutput?job=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1
,包含协议类型:http,主机及端口:PC-20130917RGUY:50060,路径名称:mapOutput,查询参数包含作业名、map任务名、reduce编号:ob=job_201311261309_0003&map=attempt_201311261309_0003_m_000000_0&reduce=1
url会根据这个地址建立连接,并打开一个输入流读取数据。在开始读取前会判断本次的读取是否能全部放入缓存中,这部分缓存使用是有限制的:jvm_heap_size × mapred.job.shuffle.input.buffer.percent × MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION,其中jvm_heap_size可以通过mapred.job.reduce.total.mem.bytes来设置,如果没设置则通过Runtime.getRuntime().maxMemory()来获取,可以通过mapred.child.opts来影响jvm堆的大小,mapred.job.shuffle.input.buffer.percent可以在参数文件中设置,默认为0.7,MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION在当前版本中为一常量值0.25,也就是说加入我们指定jvm堆大小为1024M,那么一个ReduceTask在拷贝时用到的缓存为1024×0.7×0.25=179M,当我们的map输出大于179M时,则直接写入文件.
ReduceTask.java 1373行
InputStream input = setupSecureConnection(mapOutputLoc, connection);
// 校验任务ID
TaskAttemptID mapId = null;
try {
mapId =
TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
} catch (IllegalArgumentException ia) {
LOG.warn("Invalid map id ", ia);
return null;
}
TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
if (!mapId.equals(expectedMapId)) {
LOG.warn("data from wrong map:" + mapId +
" arrived to reduce task " + reduce +
", where as expected map output should be from " + expectedMapId);
return null;
}
//判断返回数据长度是否异常
//取得压缩和未压缩长度,后面判断在内存还是硬盘做shuffle
long decompressedLength =
Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
long compressedLength =
Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
if (compressedLength < 0 || decompressedLength < 0) {
LOG.warn(getName() + " invalid lengths in map output header: id: " +
mapId + " compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
return null;
}
//判断reduce编号是否相同
int forReduce =
(int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
if (forReduce != reduce) {
LOG.warn("data for the wrong reduce: " + forReduce +
" with compressed len: " + compressedLength +
", decompressed len: " + decompressedLength +
" arrived to reduce task " + reduce);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
", decompressed len: " + decompressedLength);
}
//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
// the total inmem fs
//2. There is space available in the inmem fs
// 判断拷贝的数据能否完全放入内存中,内存计算公式为: JVM堆尺寸×mapred.job.shuffle.input.buffer.percent(0.7)× 1/4
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
// Shuffle
MapOutput mapOutput = null;
if (shuffleInMemory) {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into RAM from " + mapOutputLoc.getTaskAttemptId());
}
//如果可以放入内存,则放入新建立的byte buffer中
mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
(int)decompressedLength,
(int)compressedLength);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Shuffling " + decompressedLength + " bytes (" +
compressedLength + " raw bytes) " +
"into Local-FS from " + mapOutputLoc.getTaskAttemptId());
}
//内存中放不下则放入磁盘中
mapOutput = shuffleToDisk(mapOutputLoc, input, filename,
compressedLength);
}
return mapOutput;
}
如果内存足够大,则copy过来的数据直接放入内存中,首先会分配一个byte数组,然后从上面建立的输入流冲取得所需数据。
ReduceTask.java 1646行
//是否需要重新连接
if (!createdNow) {
// Reconnect
try {
connection = mapOutputLoc.getOutputLocation().openConnection();
input = setupSecureConnection(mapOutputLoc, connection);
} catch (IOException ioe) {
LOG.info("Failed reopen connection to fetch map-output from " +
mapOutputLoc.getHost());
// Inform the ram-manager
ramManager.closeInMemoryFile(mapOutputLength);
ramManager.unreserve(mapOutputLength);
throw ioe;
}
}
//包装输入流
IFileInputStream checksumIn =
new IFileInputStream(input,compressedLength);
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
input = codec.createInputStream(input, decompressor);
}
// 创建buffer,从输入流读取并填充
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
int bytesRead = 0;
try {
//循环读取流冲数据至缓存中
int n = input.read(shuffleData, 0, shuffleData.length);
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
// indicate we're making progress
reporter.progress();
n = input.read(shuffleData, bytesRead,
(shuffleData.length-bytesRead));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());
}
//数据读取完毕则关闭输入流
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);
.....
}
// 关闭内存文件,并唤醒内存合并线程
ramManager.closeInMemoryFile(mapOutputLength);
// 校验数据读取长度
if (bytesRead != mapOutputLength) {
// Inform the ram-manager
ramManager.unreserve(mapOutputLength);
// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;
throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}
.....
return mapOutput;
}
如果内存过小不能存放本次读取的数据则直接写入磁盘文件中,我们会在相关目录中看到这个文件如:C:/hadoop/tasklog/taskTracker/hadoop/jobcache/job_201311281345_0001/attempt_201311281345_0001_r_000001_1/output/map_0.out-0
// 开始数据拷贝
OutputStream output = null;
long bytesRead = 0;
try {
output = rfs.create(localFilename);
//讲map输出直接写入磁盘时分配的缓存,固定64K
byte[] buf = new byte[64 * 1024];
int n = -1;
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
output.write(buf, 0, n);
// indicate we're making progress
reporter.progress();
try {
n = input.read(buf, 0, buf.length);
} catch (IOException ioe) {
readError = true;
throw ioe;
}
}
LOG.info("Read " + bytesRead + " bytes from map-output for " +
mapOutputLoc.getTaskAttemptId());
output.close();
input.close();
} catch (IOException ioe) {
LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(),
ioe);
// Discard the map-output
try {
mapOutput.discard();
} catch (IOException ignored) {
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ignored);
}
mapOutput = null;
// Close the streams
IOUtils.cleanup(LOG, input, output);
// Re-throw
throw ioe;
}
// 读取数据后的检测
if (bytesRead != mapOutputLength) {
try {
mapOutput.discard();
} catch (Exception ioe) {
// IGNORED because we are cleaning up
LOG.info("Failed to discard map-output from " +
mapOutputLoc.getTaskAttemptId(), ioe);
} catch (Throwable t) {
String msg = getTaskID() + " : Failed in shuffle to disk :"
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
mapOutput = null;
throw new IOException("Incomplete map output received for " +
mapOutputLoc.getTaskAttemptId() + " from " +
mapOutputLoc.getOutputLocation() + " (" +
bytesRead + " instead of " +
mapOutputLength + ")"
);
}
return mapOutput;
}
阅读笔记:
1. MapOutput类 本质上是一个指向一个数据块的指针,该数据块可以在硬盘上,也可以在内存上。(1)final boolean inMemory表示该数据块是否在内存中 (2)final Path file表示数据在硬盘上的路径 (3)byte[] data表示数据在内存中的数据块
2. ReduceTask.run() 是ReduceTask的起始点。其中分为三部分:(1).Copy阶段(由reduceCopier.fetchOutputs()完成)
(2).Sort阶段(由Merger.merge()完成) (3).Reduce阶段(由runOldReducer()或者runNewReducer()完成)
3. fetchOutputs()函数中启动多个(由mapred.reduce.parallel.copies属性设置,默认为5个)MapOutputCopier线程进行远程数据拷贝到本地。远程拷贝运行过程中,存在 InMemFSMergeThread线程 和 LocalFSMerger线程 进行文件合并。
4. MapOutput类中的 discard()函数即抛弃拷贝的map输出结果。若该MapOutput数据块在内存上,则将数据指针data置null,若数据块在硬盘上,则调用 fs.delete(file,true) 删除该文件。
5. 远程拷贝过程中,每次拷贝一个数据块时,若该数据块可以放入内存则放入内存,否则放入硬盘。有两个标准决定该数据块是否应该放入硬盘:(1) 数据块小于 java_heaps _size * mapred.job.shuffle.input.buffer.percent * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION(0.25) (2) 内存中有足够空间放入该数据块。