正如前面所说,DAG图计算会RDD的以来关系转化为需要执行的stage,而每个stage对应的是一组没有shuffle的RDD,所以说不是只划分了stage就会执行,所以还需要将一个能触发stage执行的东西,那就是job:
什么是job呢,其实job的触发就是当你在程序中运行一个action,而action是什么呢:“我们现在需要运行的结果”,下面列出了一些action的说明:
具体的一个job执行的流程:
RunJob-----》DAG图计算生成stage(stage有父子关系,所以提交有先后顺序)-----》执行
接下来我们具体的说说job的提交:
1. rdd. acti on() 会调用 DAGScheduler.runJob( rdd , p r o c e s s P a r t i t i o n , r e s u l t H a n d l e r ) 来生成 j ob。
2. runJob() 会首先通过 r d d . g e t P a r t i t i o n s ( ) 来得到 fi nal RDD 中应该存在的 parti ti on 的个数和类型:Array[Parti ti on]。
然后根据 parti ti on 个数 new 出来将来要持有 resul t 的数组 A r r a y [ R e s u l t ] ( p a r t i t i o n s . s i z e ) 。
3. 最后调用 DAGSchedul er 的 r u n J o b ( r d d , c l e a n e d F u n c , p a r t i t i o n s , a l l o w L o c a l , r e s u l t H a n d l e r ) 来提交 j ob。
cl eanedFunc 是 processPari tti on 经过闭包清理后的结果,这样可以被序列化后传递给不同节点的 task。
4. DAGSchedul er 的 runJob 继续调用 s u b m i t J o b ( r d d , f u n c , p a r t i t i o n s , a l l o w L o c a l , r e s u l t H a n d l e r ) 来提交 j ob。
5. submi tJob() 首先得到一个 j obId,然后再次包装 func,向 DAGSchedul erEventProcessActor 发送 JobSubmi tted 信
息,该 actor 收到信息后进一步调用 d a g S c h e d u l e r . h a n d l e J o b S u b m i t t e d ( ) 来处理提交的 j ob。之所以这么麻烦,是为
了符合事件驱动模型。
6. handl eJobSubmmi tted() 首先调用 fi nal Stage = newStage() 来划分 stage,然后submi tStage(fi nal Stage)。由于
fi nal Stage 可能有 parent stages,实际先提交 parent stages,等到他们执行完,fi nal Stage 需要再次提交执行。再次
提交由 handl eJobSubmmi tted() 最后的 submi tWai ti ngStages() 负责。
分析一下 newStage() 如何划分 stage:
1. 该方法在 new Stage() 的时候会调用 fi nal RDD 的 getParentStages()。
2. getParentStages() 从 fi nal RDD 出发,反向 vi si t 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到
stage,遇到 Shuffl eDependency 切开 stage,并递归到 Shuffl eDepedency 依赖的 stage。
3. 一个 Shuffl eMapStage(不是最后形成 resul t 的 stage)形成后,会将该 stage 最后一个 RDD 注册
到 M a p O u t p u t T r a c k e r M a s t e r . r e g i s t e r S h u f f l e ( s h u f f l e D e p . s h u f f l e I d , r d d . p a r t i t i o n s . s i z e ) ,这一步很重要,因
为 shuffl e 过程需要 MapOutputTrackerMaster 来指示 Shuffl eMapTask 输出数据的位置。
分析一下 submi tStage(stage) 如何提交 stage 和 task:
1. 先确定该 stage 的 mi ssi ngParentStages,使用 g e t M i s s i n g P a r e n t S t a g e s ( s t a g e ) 。如果 parentStages 都可能已经执
行过了,那么就为空了。
2. 如果 mi ssi ngParentStages 不为空,那么先递归提交 mi ssi ng 的 parent stages,并将自己加入到 wai ti ngStages 里
面,等到 parent stages 执行结束后,会触发提交 wai ti ngStages 里面的 stage。
3. 如果 mi ssi ngParentStages 为空,说明该 stage 可以立即执行,那么就调用 s u b m i t M i s s i n g T a s k s ( s t a g e , j o b I d ) 来
生成和提交具体的 task。如果 stage 是 Shuffl eMapStage,那么 new 出来与该 stage 最后一个 RDD 的 parti ti on 数相
同的 Shuffl eMapTasks。如果 stage 是 Resul tStage,那么 new 出来与 stage 最后一个 RDD 的 parti ti on 个数相同的
Resul tTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用 t a s k S c h e d u l e r . s u b m i t T a s k s ( t a s k S e t ) 来提交一
整个 taskSet。
4. 这个 taskSchedul er 类型是 TaskSchedul erImpl,在 submi tTasks() 里面,每一个 taskSet 被包装成 manager:
TaskSetMananger,然后交给 s c h e d u l a b l e B u i l d e r . a d d T a s k S e t M a n a g e r ( m a n a g e r ) 。schedul abl eBui l der 可以是
FIFOSchedul abl eBui l der 或者 Fai rSchedul abl eBui l der 调度器。submi tTasks() 最后一步是通
知 b a c k e n d . r e v i v e O f f e r s ( ) 去执行 task,backend 的类型是 Schedul erBackend。如果在集群上运行,那么这个
backend 类型是 SparkDepl oySchedul erBackend。
5. SparkDepl oySchedul erBackend 是 CoarseGrai nedSchedul erBackend 的子类, b a c k e n d . r e v i v e O f f e r s ( ) 其实是向
Dri verActor 发送 Revi veOffers 信息。SparkDepl oySchedul erBackend 在 start() 的时候,会启动 Dri verActor。
Dri verActor 收到 Revi veOffers 消息后,会调用 l a u n c h T a s k s ( s c h e d u l e r . r e s o u r c e O f f e r s ( S e q ( n e w
W o r k e r O f f e r ( e x e c u t o r I d , e x e c u t o r H o s t ( e x e c u t o r I d ) , f r e e C o r e s ( e x e c u t o r I d ) ) ) ) ) 来 l aunch tasks。schedul er
就是 TaskSchedul erImpl。 s c h e d u l e r . r e s o u r c e O f f e r s ( ) 从 FIFO 或者 Fai r 调度器那里获得排序后的
TaskSetManager,并经过 T a s k S c h e d u l e r I m p l . r e s o u r c e O f f e r ( ) ,考虑 l ocal i ty 等因素来确定 task 的全部信息
TaskDescri pti on。调度细节这里暂不讨论。
6. Dri verActor 中的 l aunchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSi ze,那么直接将
task 送到 executor 那里执行 e x e c u t o r A c t o r ( t a s k . e x e c u t o r I d ) ! L a u n c h T a s k ( n e w
S e r i a l i z a b l e B u f f e r ( s e r i a l i z e d T a s k ) ) 。