现在的位置: 首页 > 综合 > 正文

Capacity Scheduler and Dynamic Scheduler

2013年02月14日 ⁄ 综合 ⁄ 共 6242字 ⁄ 字号 评论关闭



CapacityTaskScheduler

链接:

http://hadoop.apache.org/common/docs/r0.19.2/capacity_scheduler.html

http://hadoop.apache.org/common/docs/r0.20.2/capacity_scheduler.html



https://issues.apache.org/jira/browse/HADOOP-3445

 

特性:

支持多个队列,每个job
只会被提交到一个队列上。

每个队列被分配了集群容量的一部分容量。

空余的容量被分配给超过其容量的任何队列。

队列内的作业支持优先级。

每个队列强制地分配给每个用户受限制的容量。

在分配任务给
TT
时会考虑到
JOB
的内存要求以及在
TT
结点是
RAM

VM
的情况。而
TT
中都是可以通过
mapred.child.ulimit

参数来设置
child
进程的
VM
大小。

 

start()

// initialize our queues from the config settings

   

CapacitySchedulerConf schedConf =

new


CapacitySchedulerConf();

   

try

{

     

initializeQueues(queueManager.getRoot().getJobQueueInfo().getChildren(),

         

schedConf,

false

);

   
}

catch

(Throwable e) {

     

LOG

.error(
"Couldn't
initialize queues because of the excecption : "

         
+
StringUtils.stringifyException
(e));

     

throw


new

IOException(e);

}

 

 

 

// Queues are
ready. Now register jobQueuesManager with the JobTracker so as

   

// to listen to job
changes

   

taskTrackerManager
.addJobInProgressListener(
jobQueuesManager
);

 

   

//Start thread for
initialization

   

if

(
initializationPoller
==
null

) {

     

this

.
initializationPoller
=
new


JobInitializationPoller(

         

jobQueuesManager
,
taskTrackerManager
);

   
}

   

initializationPoller
.init(
jobQueuesManager
.getJobQueueNames(),
schedConf);

   

initializationPoller
.setDaemon(
true

);

   

initializationPoller
.start();

 

assignTasks

首先根据TT
信息得到TT
和集群当前的MR
运行数目以及空余数。

   
ClusterStatus c
=

taskTrackerManager
.getClusterStatus();

   

int

mapClusterCapacity =
c.getMaxMapTasks();

   

int


reduceClusterCapacity = c.getMaxReduceTasks();

   

int

maxMapSlots =
taskTrackerStatus.getMaxMapSlots();

   

int

currentMapSlots =
taskTrackerStatus.countOccupiedMapSlots();

   

int

maxReduceSlots =
taskTrackerStatus.getMaxReduceSlots();

   

int

currentReduceSlots =
taskTrackerStatus.

countOccupiedReduceSlots

();

LOG

.debug(
"TT asking for
task, max maps="

更新集群中的MR
的相关调度环境信息。

updateContextObjects(mapClusterCapacity,
reduceClusterCapacity);

TT
中有空余的R
Slots
时,调用R
的调度器分配tasks

if

(maxReduceSlots >
currentReduceSlots) {

     

//reduce slot
available , try to get a

     

//reduce task

     
tlr =

reduceScheduler
.assignTasks(taskTracker);

     

if


(TaskLookupResult.LookUpStatus.

TASK_FOUND

==

       

tlr.getLookUpStatus()) {

       

result.add(tlr.getTask());

     
}

}

同样当TT
中的M
有空余slots
时,调用M
调度器分配M
tasks

if

(maxMapSlots >
currentMapSlots) {

     

//map slot
available , try to get a map task

     
tlr =

mapScheduler
.assignTasks(taskTracker);

     

if


(TaskLookupResult.LookUpStatus.

TASK_FOUND

==

       

tlr.getLookUpStatus()) {

       

result.add(tlr.getTask());

     
}

   
}

 

updateContextObjec
ts

 

TaskSchedulingMgr

该类是调度处理算法,也是最为重要的类。它对于M
R
都是适用的算法。

把调度算法抽象出来了,实现类有M
R
二种子类。

assignTasks()

该方法才是真正为TT
分配tasks
的方法。

找到该TT
正在休闲的job
,如果有job,
则拿到job
需要的slots
数。

JobInProgress job = taskTracker.getJobForFallowSlot(
type
);

int

availableSlots =
taskTracker.getAvailableSlots(

type
);

如果当前的空闲slots
数大于一个task
需要的slots
数,则可以为该job
分配task

如果TT
没有被分配的JOB
,则需要从多个队列中选择一个job

首先,对队列进行排序,依次按顺序去遍历队列,排序算法为QueueComparator
类。

然后,得到队列的调度信息,如果队列的容量为0
,则跳过。

判断当前的队列的当前占用的slots
+
每个task
分配的slots

 
>
最大容量。

如果是,则跳过。

如果上面的条件都没有成立,则从当前队列中分配task
,同时返回分配task
状态。有找到task,
没有找到task
,由于memory
不满足而失败。

 

 

for

(AbstractQueue q :
getOrderedJobQueues()) {

       

QueueSchedulingContext qsc = q.getQueueSchedulingContext();

       


// we may have queues with capacity=0. We shouldn't
look at jobs from

       


// these queues

       


if

(0 == getTSC(qsc).getCapacity()) {

         


continue

;

       

}

 

   

    

//This call is
important for optimization purposes , if we

       


//have reached the limit already no need for traversing
the queue.

       


if

(
this

.areTasksInQueueOverMaxCapacity(qsc,1))
{

         


continue

;

       

}

       

       

TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);

       

TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();

 

       


if

(lookUpStatus ==
TaskLookupResult.LookUpStatus.

NO_TASK_FOUND

) {

         


continue

;
// Look in other
queues.

       

}

 

  

     

// if we find a
task, return

       


if

(lookUpStatus ==
TaskLookupResult.LookUpStatus.

TASK_FOUND

) {

         


return

tlr;

       

}

       


// if there was a memory mismatch, return

       


else


if

(lookUpStatus ==

         

TaskLookupResult.LookUpStatus.

TASK_FAILING_MEMORY_REQUIREMENT

) {

           

return

tlr;

       

}

     
}

 

  
TaskLookupResult
tlr = getTaskFromQueue(taskTracker, qsc);

如何从队列中取到task
呢?

遍历队列正运行的job

for

(JobInProgress j :

       

scheduler
.
jobQueuesManager
.getJobQueue(qsi.getQueueName())

         

.getRunningJobs()) {

同样判断队列是否超过最大容量,如果是,跳过该job

判断job
的用户是不是超过限制,如果是,跳过该job

再判断当前的memory
是否满足该job
M,R
。如果满足则从该job
中分配task

如果不满足,则返回失败信息。

经过上述条件,如果没有找到job
的话,则去掉用户限制,再次遍历队列,找到相应的job

而真正的从job
中分配task
的方法将分别由实现类MapSchedulingMgr
ReduceSchedulingMgr
来完成。

 

 


MapSchedulingMgr

 

   
Task
obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)

   

throws

IOException {

     

synchronized

(
scheduler
) {

       
ClusterStatus
clusterStatus =

scheduler
.
taskTrackerManager

           

.getClusterStatus();

       

int

numTaskTrackers =
clusterStatus.getTaskTrackers();

       

return


job.obtainNewMapTask(taskTracker, numTaskTrackers,

           

scheduler
.
taskTrackerManager
.getNumberOfUniqueHosts());

     
}

}

就是直接调用job
的获取新的task
接口。


ReduceSchedulingMgr


MapSchedulingMgr
类似。


 

 

QueueSchedulingContext

为每个队列记录调度信息。

维持跟队列有关的调度信息,比如名字,容量,当前运行task
数等。

这些信息被用来决定怎么去分配task
,重装分布容量等。

 

 

private


static


abstract


class

QueueComparator

     

implements


Comparator<AbstractQueue> {

     

abstract


TaskSchedulingContext getTSC(

    
   
QueueSchedulingContext qsi);

  

  

public


int


compare(AbstractQueue q1, AbstractQueue q2) {

       

TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());

       

TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());

       

// look at how much
capacity they've filled. Treat a queue with

       

// capacity=0
equivalent to a queue running at capacity

       

double

r1 = (0 ==
t1.getCapacity())? 1.0f:

         
(

double

)
t1.getNumSlotsOccupied() /(

double

) t1.getCapacity();

      
 

double

r2 = (0 ==
t2.getCapacity())? 1.0f:

         
(

double

)
t2.getNumSlotsOccupied() /(

double

) t2.getCapacity();

       

if

(r1<r2)
return

-1;

       

else


if

(r1>r2)
return

1;

       

else


return

0;

     
}

}

 

 

 

DynamicPriorityScheduler

允许用户不断地增加或更改他们的队列优先顺序来满足他们当前负载的需要。

根据当前的需求,自动的调节优先级。即在运行过程中去计算队列优先级。

即可以根据当前的集群以及队列的开销,来按照这个开销比例来动态的为各个用户队列分配等比例的task

定时更新开销,根据以往各个队列的开销来为不同的队列分配不同数量的task

 

总结:

CapacityTaskScheduler
会为每个队列,用户给定一个最大容量,当超过最大容量时,则不给该队列再分配任务,否则会分配给队列或用户任务。再把多余的资源分配给超过最大容量的队列或用户。同时该调度还会判断
TT
中进程的内存是否满足任务所需要的内存,来作为任务分配的一个条件。

DynamicPriorityScheduler
会保存好每个队列的开销和预算,每隔一段时间进行更新,分配任务时根据队列的开销和预算来分配,如果超过预算会杀死部分任务。

对于任务调试器主要类有
:
TaskScheduler
以及各种队列,任务排序算法等。

方法有:start(),assignTasks(TaskTracker)
等。另外还有几个任务监听类JobInProgress

 

大致把调试器的源码过了一遍,接下来要看集群监控系统,以及hdfs的相关代码。

抱歉!评论已关闭.