CapacityTaskScheduler
链接:
http://hadoop.apache.org/common/docs/r0.19.2/capacity_scheduler.html
http://hadoop.apache.org/common/docs/r0.20.2/capacity_scheduler.html
https://issues.apache.org/jira/browse/HADOOP-3445
特性:
支持多个队列,每个job
只会被提交到一个队列上。
每个队列被分配了集群容量的一部分容量。
空余的容量被分配给超过其容量的任何队列。
队列内的作业支持优先级。
每个队列强制地分配给每个用户受限制的容量。
在分配任务给
TT
时会考虑到
JOB
的内存要求以及在
TT
结点是
RAM
和
VM
的情况。而
TT
中都是可以通过
mapred.child.ulimit
参数来设置
child
进程的
VM
大小。
start()
// initialize our queues from the config settings
CapacitySchedulerConf schedConf =
new
CapacitySchedulerConf();
try
{
initializeQueues(queueManager.getRoot().getJobQueueInfo().getChildren(),
schedConf,
false
);
}
catch
(Throwable e) {
LOG
.error(
"Couldn't
initialize queues because of the excecption : "
+
StringUtils.stringifyException
(e));
throw
new
IOException(e);
}
// Queues are
ready. Now register jobQueuesManager with the JobTracker so as
// to listen to job
changes
taskTrackerManager
.addJobInProgressListener(
jobQueuesManager
);
//Start thread for
initialization
if
(
initializationPoller
==
null
) {
this
.
initializationPoller
=
new
JobInitializationPoller(
jobQueuesManager
,
taskTrackerManager
);
}
initializationPoller
.init(
jobQueuesManager
.getJobQueueNames(),
schedConf);
initializationPoller
.setDaemon(
true
);
initializationPoller
.start();
assignTasks
首先根据TT
信息得到TT
和集群当前的MR
运行数目以及空余数。
ClusterStatus c
=
taskTrackerManager
.getClusterStatus();
int
mapClusterCapacity =
c.getMaxMapTasks();
int
reduceClusterCapacity = c.getMaxReduceTasks();
int
maxMapSlots =
taskTrackerStatus.getMaxMapSlots();
int
currentMapSlots =
taskTrackerStatus.countOccupiedMapSlots();
int
maxReduceSlots =
taskTrackerStatus.getMaxReduceSlots();
int
currentReduceSlots =
taskTrackerStatus.
countOccupiedReduceSlots
();
LOG
.debug(
"TT asking for
task, max maps="
更新集群中的MR
的相关调度环境信息。
updateContextObjects(mapClusterCapacity,
reduceClusterCapacity);
当TT
中有空余的R
的Slots
时,调用R
的调度器分配tasks
。
if
(maxReduceSlots >
currentReduceSlots) {
//reduce slot
available , try to get a
//reduce task
tlr =
reduceScheduler
.assignTasks(taskTracker);
if
(TaskLookupResult.LookUpStatus.
TASK_FOUND
==
tlr.getLookUpStatus()) {
result.add(tlr.getTask());
}
}
同样当TT
中的M
有空余slots
时,调用M
调度器分配M
的tasks
。
if
(maxMapSlots >
currentMapSlots) {
//map slot
available , try to get a map task
tlr =
mapScheduler
.assignTasks(taskTracker);
if
(TaskLookupResult.LookUpStatus.
TASK_FOUND
==
tlr.getLookUpStatus()) {
result.add(tlr.getTask());
}
}
updateContextObjec
ts
TaskSchedulingMgr
该类是调度处理算法,也是最为重要的类。它对于M
,R
都是适用的算法。
把调度算法抽象出来了,实现类有M
,R
二种子类。
assignTasks()
该方法才是真正为TT
分配tasks
的方法。
找到该TT
正在休闲的job
,如果有job,
则拿到job
需要的slots
数。
JobInProgress job = taskTracker.getJobForFallowSlot(
type
);
int
availableSlots =
taskTracker.getAvailableSlots(
type
);
如果当前的空闲slots
数大于一个task
需要的slots
数,则可以为该job
分配task
。
如果TT
没有被分配的JOB
,则需要从多个队列中选择一个job
。
首先,对队列进行排序,依次按顺序去遍历队列,排序算法为QueueComparator
类。
然后,得到队列的调度信息,如果队列的容量为0
,则跳过。
判断当前的队列的当前占用的slots
数+
每个task
分配的slots
数
>
最大容量。
如果是,则跳过。
如果上面的条件都没有成立,则从当前队列中分配task
,同时返回分配task
状态。有找到task,
没有找到task
,由于memory
不满足而失败。
|
TaskLookupResult
tlr = getTaskFromQueue(taskTracker, qsc);
如何从队列中取到task
呢?
遍历队列正运行的job
。
for
(JobInProgress j :
scheduler
.
jobQueuesManager
.getJobQueue(qsi.getQueueName())
.getRunningJobs()) {
同样判断队列是否超过最大容量,如果是,跳过该job
。
判断job
的用户是不是超过限制,如果是,跳过该job
。
再判断当前的memory
是否满足该job
的M,R
。如果满足则从该job
中分配task
。
如果不满足,则返回失败信息。
经过上述条件,如果没有找到job
的话,则去掉用户限制,再次遍历队列,找到相应的job
。
而真正的从job
中分配task
的方法将分别由实现类MapSchedulingMgr
和ReduceSchedulingMgr
来完成。
Task
obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws
IOException {
synchronized
(
scheduler
) {
ClusterStatus
clusterStatus =
scheduler
.
taskTrackerManager
.getClusterStatus();
int
numTaskTrackers =
clusterStatus.getTaskTrackers();
return
job.obtainNewMapTask(taskTracker, numTaskTrackers,
scheduler
.
taskTrackerManager
.getNumberOfUniqueHosts());
}
}
就是直接调用job
的获取新的task
接口。
ReduceSchedulingMgr
跟MapSchedulingMgr
类似。
QueueSchedulingContext
为每个队列记录调度信息。
维持跟队列有关的调度信息,比如名字,容量,当前运行task
数等。
这些信息被用来决定怎么去分配task
,重装分布容量等。
private
static
abstract
class
QueueComparator
implements
Comparator<AbstractQueue> {
abstract
TaskSchedulingContext getTSC(
QueueSchedulingContext qsi);
public
int
compare(AbstractQueue q1, AbstractQueue q2) {
TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
// look at how much
capacity they've filled. Treat a queue with
// capacity=0
equivalent to a queue running at capacity
double
r1 = (0 ==
t1.getCapacity())? 1.0f:
(
double
)
t1.getNumSlotsOccupied() /(
double
) t1.getCapacity();
double
r2 = (0 ==
t2.getCapacity())? 1.0f:
(
double
)
t2.getNumSlotsOccupied() /(
double
) t2.getCapacity();
if
(r1<r2)
return
-1;
else
if
(r1>r2)
return
1;
else
return
0;
}
}
DynamicPriorityScheduler
允许用户不断地增加或更改他们的队列优先顺序来满足他们当前负载的需要。
根据当前的需求,自动的调节优先级。即在运行过程中去计算队列优先级。
即可以根据当前的集群以及队列的开销,来按照这个开销比例来动态的为各个用户队列分配等比例的task
。
定时更新开销,根据以往各个队列的开销来为不同的队列分配不同数量的task
。
总结:
CapacityTaskScheduler
会为每个队列,用户给定一个最大容量,当超过最大容量时,则不给该队列再分配任务,否则会分配给队列或用户任务。再把多余的资源分配给超过最大容量的队列或用户。同时该调度还会判断
TT
中进程的内存是否满足任务所需要的内存,来作为任务分配的一个条件。
DynamicPriorityScheduler
会保存好每个队列的开销和预算,每隔一段时间进行更新,分配任务时根据队列的开销和预算来分配,如果超过预算会杀死部分任务。
对于任务调试器主要类有
:
TaskScheduler
以及各种队列,任务排序算法等。
方法有:start(),assignTasks(TaskTracker)
等。另外还有几个任务监听类JobInProgress
。
大致把调试器的源码过了一遍,接下来要看集群监控系统,以及hdfs的相关代码。