现在的位置: 首页 > 综合 > 正文

cloud foundry dea源码解析

2014年06月11日 ⁄ 综合 ⁄ 共 14155字 ⁄ 字号 评论关闭
文章目录

准确的讲,不全是原创,参考了一些blog,主要是对源代码进行整理。

在CF上部署Tuscany应用时,纠结很久,因此,将自己之前整理的文档先发布出来。

欢迎大家讨论,批评指正!



1. dea介绍

DEA(DropletExecution Agent)

  DEA manages anapplication
instance’slifecycle
, instructed by cc to start/stop application instances,broadcast instances’stateover NATS. 
Requests for directories/files are handled by DEA, which respondswith
a HTTP redirect or a URL that hits the directory server directly. 
The URLis signed by DEA, and the directory server checks the validity of the URL withthe DEA before serving it.


1.1 dea组件

简单来说,dea主要负责以下4个操作:

a1 DEA负责管理warden容器,应用的部署和运行都是在warden容器中。

a2 DEA部署app

a3 DEA运行app

a4 DEA监视app状态,并且定期向HM发送状态信息。dea将container的ip,host,port报告给router。

  

如果app需要部署,或者扩展到其它节点,CC会发布一条有droplet需要run的消息。DEA接收到消息,查看droplet的metadata,确定能否满足内存,运行时等要求。

如果满足,DEA发送一条确认消息。CC选择DEA运行,DEA启动start脚本。

  同一个应用的不同实例,跑在不同的端口上。

1.2 dea file_server

  DEA启动时会附带启动一个file api server,负责验证并返回真实路径。

dea directory server单独启动,启动后router注册

dea上传droplet,下载log,都是通过deadirectory server进行的。

1.3 dea与router的交互

Router:

·      router启动时会订阅 “router.register 这个channel同时也会定时向 “router.start 这个channel发送消息

·      其他需要向router注册的组件启动时会订阅“router.start”这个channel.一旦接收到消息会立刻收集需要注册的信息ipport),然后向router.register发送消息

·      router接收到“router.register消息后立即更新路由信息

·      以上过程不停循环使router的状态时刻保持最新.

1.4 dea与cloud-controller交互


cloud controller指挥dea进行打包和运行:

·      所有组件包括dea启动时会生成一个唯一的UUID来标识组件.dea会定时的将自身情况和ID发送到“stager.advertise  “dea.advertise这两个channel同时会订阅 “staging.<uuid>.start” 和“dea.<uuid>.start这两个channel.

·      cloud controller订阅这“staging.<uuid>.start”和“dea.<uuid>.start”这两个channel并根据channel的信息构造并时刻更新dea_poolstager_pool

·      当有打包或者运行应用的请求时cloudcontroller会去查询这两个pool如果有合适的dea

就会向对应的“[dea|stager].<uuid>.start发送消息.

·      对应uuiddea收到消息根据消息执行打包或运行任务

1.5 dea与nats通信

nats是CF使用的轻量通信器,dea也有一个client,负责与nats进行通信

natsdroplet.exited信号

有三种场景下droplet.exited会接收到:

·      app被显式地stop这种情况不需要对app进行调整。

·      DEA撤离.当有DEA撤离的时候属于该DEAinstance应该在其他地方重启HM应该初始化这个重启.
HM
CC发送消息CC发送启动instance的消息

·      app崩溃.崩溃的instance应该被重启(HM调用nudger.start_instance,向cc发消息)除非这个instance短时间内崩溃多次就会被标识为flapping
policy
加入重启delay。 flapping状态的instance,判断是否放弃重启,如果没有放弃重启,schedule_delayed_restart.

1.6 dea的heartbeat处理

DEA周期性的会把heartbeat信息发送到Nats总线上。这些heartbeat包括了DEA的识别消息和所有该DEA上的instance的信息。
heartbeat
会被用来管理missingextrainstanceMissing
instance
会被要求启动extrainstance会被要求停止Droplet对象会跟踪自身每个instanceheartbeat的每个version一个当前versioninstance如果没有在droplet_lost的秒数内收到heartbeat,就被认为是missing

2 dea源码分析

下面主要是参考的CF v2 , dea_ng的源码。
代码主要是ruby源码

2.1 dea启动

首先从启动dea开始,bin/dea脚本,其实就是ruby源码

require rubygems, bundler/setup, eventmachine,yaml,dea/bootstrap

主要操作,loadconfig:Bootstrap.new(config)

EM.epoll
帮助解决大并发连接的问题

EM的分析

EventMachine是一个基于Reactor(单线程)
模式的网络编程和并发编程框架。

EM.run do

  bootstrap.setup&& bootstrap.start

end

  这里就是将启动任务都交给Bootstrap类,源码为lib/dea/bootstrap.rb。

2.1.1 bootstrap.rb


a. boostrap.setup

 boostrap.setup包括setuplog,droplet_registry, instance_registry,staging_task_registry,resource_manager, directory_server,signal_handlers,directories,pid_files,
nats, router_client

  创建一个config实例,nats实例,responders实例,directory_server实例,directory_server_v2实例,droplet_registry实例,instance_registry实例,resource_manager实例,staging_task_registry实例,router_client实例,

  创建DeaLocator,StagingLocator,Staging实例。

  setup_directories创建db、droplets、instances、tmp、staging文件夹,

  setup_pid_file创建pid文件.

  setup_sweepers,EM添加一个心跳计数器10s,并向instance_registry发送心跳。

  setup_directory_server_v2创建DirectoryServerV2,并configure_endipoint(instance_registry, staging_task_registry)

  setup_direcotry_server创建DirectoryServer


b. bootstrap.start

 启动component,nats,directory_server,注册directory_server,

 启动 directory_server_v2,setup_varz(varz是一个safeHash,就是一组参数表:设置varz[:stack],EM每隔10s更新一次varz),start_finish启动完成后向nats发消息。

  启动directory_server,

  启动nats(启动DeaLocator,StagingLocator,Staging实例)


下面具体将一下每一个启动操作:

  • start_component: VCAP::Component.register(type,local_ip,index,nats,port,user,pwd,logger) ,这个向NATS announce the availability of this component,Returns the published
    configurationof the component。生成一个uuid。
  •  directory_server向router_client注册自己的local_ip,port,host_name
  •  start_finish, 启动结束后,发布一条主题为”dea.start”的hello msg。并调用staging_locator和dea_locator的advertise方法,publish主题为staging.advertise消息,
    汇报这个dea的id,app个数,可使用内存等信息和dea.advertise的消息, 汇报可用内存和平台信息。

c. 一组handle方法

处理dea_status,dea_directed_start(create_instance,并启动),

处理dea_stop(将每个running的instance stop),

处理dea_discover,EM.add_timer(delay){message.respond}

处理dea_update(droplet绑定到新的url,向router_client重新注册),

处理dea_find_droplet,droplet_status,这些在nats.rb中被调用。

 handle_health_manager_start(instance),send_heart_beat(instance_registry).

 handle_router_start,将instance_registry中running的instance注册到router_client.

d.  create_instance方法

create_instance方法由handle_dea_directed_start方法调用,创建Instance < Task,instance.setup,这里会调用到instance.rb中的方法,进行具体的部署任务,后面会详细介绍。

    instance.on(Transition.new(:starting,:crashed)) do … end

1 状态由starting转为crashed时,send_exited_message

2 状态由starting转为running时,send_heartbeat([instance]),并向router_client注册自己.

3 状态由running转为crashed时,向router_client deregister,并send_exited_message.

4 状态由running转为stopping时,router_client.unregister(instance),

send_exited_message(Shutdown|stopped) 

5 状态由starting转为running时,save_snapshot.

6 状态由running转为stopping或crashed时,save_snapshot.

7 状态由stopping转为stopped时,

instance_register.unregister(instance),EM.next_tick(instance.destroy)在主线程同步删除instance。

e. 一组send方法

staging_stop(stopstaging_task_registry中每一个注册的stagingtask),

  shutdown_message(在nats上发布dea.shutdown消息),exited_message(发布droplet.exited消息),heartbeat(发布dea.dropletbeat消息)


f. shutdown方法

  shutdown:send_shutdown_message,ignore_signals,nats.stop,unregister_directory_server)方法。

 

2.2 dea打包staging

a. dea/staging_task.rb && task.rb这两个类可以一起看

StagingTask < Task 在dea/responders/staging.rb中handle(message)方法,创建StagingTask对象,传入message.data

   attr_reader:bootstrap, :dir_server, :task_id

  start创建Promise,调用resolve_staging_setup,resolve_staging, p.deliver

调用Promise.resolve(staging_promise)删除workspace的临时文件

  stop(&callback)

promise_prepare_staging_log

promise_app_dir调用promise_warden_run(:app,script,true)

run_plugin_path方法,返回”buildpack_dir/bin/run”

promise_stage调用promise_warden_run(:app,script),这里的script传入了environment,dea_ruby,run_plugin_path,workspace.plugin_config_path,

执行的脚本是

/usr/bin/rubybuildpacks/bin/run/tmp/dea/staing/d2013/plugin_config

promise_task_log调用copy_out_request

promise_staging_info调用copy_out_request

promise_unpack_app调用promise_warden_run(:app,<<-BASH)

promise_pack_app调用promise_warden_run(:app,<<-BASH)

promise_app_download: Download.new

promise_log_uoload_started调用promise_warden_run(:app,<<-BASH).resolve

promise_app_upload: Upload.new

promise_buildpack_cache_upload: 创建一个Promise,创建一个Upload

promise_buildpack_cache_download调用download

promise_log_upload_finished调用promise_warden_run(:app,<<-BASH)

promise_copy_out:调用copy_out_request

promise_container_info调用promise_warden_call

promise_save_buildpack_cache

promise_pack_buildpack_cache调用promise_warden_run(:app,<<-BASH)

promise_unpack_buildpack_cache调用promise_warden_run(:app,<<-BASH)

promise_copy_out_buildpack_cache调用copy_out_request

resolve_staging调用Promise.run_serially(promise_unpack_app,

unpack_buildpack_cache,stage,pack_app,copy_out,log_upload_started,

 app_upload,save_buildpack_cache, log_upload_finished, staing_info)

resolve_staging_setup并行执行promise_app_download,promise_create_container,

如果存在buildpack_cache_download_uri,则promise_buildpack_cache_download,

并行执行promise_prepre_staging_log,

prepare_app_dir,prepare_container_info.

 -----  task.rb ------

定义start方法,让子类继承

@warden_connections

find_warden_connection,cache_warden_connection,close_warden_connection

close_warden_connections,

promise_warden_connection(name)创建Promise

  如果没有cache,创建connection并缓存,::EM.connect_unix_domain

promise_warden_call(connection_name,reuqest):第一个是函数指针?

 先调用promise_warden_connection,connection.call(request)

promise_warden_call_with_retry:rescueretry

promise_create_container返回bind_mount,CreateRequest::BindMount.new

promise_limit_disk, promise_limit_memory,

promise_warden_run(connection_name,script,privileged)调用promise_warden_call

promise_stop,promise_destroy,destroy,

resolve:resolvea promise,保证一次只运行一个promise。调用Promise.resolve(p)

copy_out_request调用promise_warden_call_with_retry

b. instance.rb
  

 这里的instance指DEA实例的一次打包任务

 class Instance< Task任务

 initialize(bootstrap)初始化

  def state=(state)

  def droplet :bootstrap.droplet_registry[droplet_sha1]

  promise_state: Promise.new, promise_state.deliver

  promise_droplet_download: 创建一个Promise,droplet.download,promise.deliver

  promise_setup_network:创建Promise,调用promise_warden_call

  promise_setup_environment在warden中执行script,创建::Warden::Protocal::NetInRequest

  promise_extract_droplet执行script,解压缩droplet包

  promise_start在warden中执行./startup。创建::Warden::Protocal::SpawnRequest

  promise_container_info创建::Warden::Protocal::InfoRequest,并在warden中执行

  promise_exec_hook_script

 start(&callback)并发执行下载droplet和SetupContainer,调用promise_droplet,

 promise_container方法。然后执行network初始化,解压缩droplet,hook脚本,

 执行promise_setup_network,promise_extract_droplet(执行tarzxf droplet),

 promise_exec_hook_script,promise_start。然后执行link, 检测droplet是否healthy,调用promise_health_check。  promise_health_check会首先调用promise_container_info获得warden_container_path,然后以此为参数,调用

promise_read_instance_manifest获取一个manifest对象,如果manifest[“state_file”]为true,

返回promise_state_file_ready,否则,如果application_uris.empty?= false,返回promise_port_open,否则直接返回true。

 如果droplet.healthy,则状态从STARTING转为RUNNING;否则p.fail

 promise_container创建Promsie,并调用promise_create_container.resolve,setup_network,promise_limit_disk,promise_limit_memory,  promise_setup_environment.resolve.最后调用p.deliver

  promise_dropelt如果不存在droplet,下载

  promise_container分别执行promise_create_container(从task.rb中继承的),

  promise_setup_network,

  promise_limit_disk,promise_limit_memory,这两个也是从task.rb中继承的。

  promise_setup_environment.

 

  stop(&callback)方法

  promise_copy_out

  promise_link

  Stage: BORN, STARTING, RUNNING, STOPPING, STOPPED, CRASHED,DELETED, RESUMING

  self.from_external和self.to_external

Transition < Struct.new(:from,:to)

  setup_stat_collector方法

   on(Transition.new(:starting,:running))do

 start_stat_collector

   end

状态由running-> stopping时,stop_stat_collector

状态由running->crashed时,stop_stat_collector

promise_collect_stats方法,调用promise_container_info方法,

run_stat_collector方法会定期的检测instance的状态,如果App有其它请求可以放在stat的返回值中return。

c. resonders/staging.rb打包

start(subscribe主题为staging、staging.#{@dea_id}.start、staging.stop)的消息,

stop,handle(msg),handle_stop,

handle(msg),创建staging task并注册,调用task完成setup、complete、stop的回调方法,启动task。

·      staging.advertise:Stagers (now DEA's) broadcast their capacity/capability

·      staging.locate:Stagers respond to any message on this subject with a staging.advertise message
(CC uses this to bootstrap)  --- 
CC打包前,必须要有用于stagingdea存在。

staging.<uuid>.start: Stagers respond to requests on this subject to stage apps.启动打包流程

·      staging:Stagers (in aqueue
group) respond to requests to stage an app (old protocol)

下图为参考图,等找到楼主的blog,会将链接附到最后



d. responders/staging_locator.rb

    

  advertise(在nats上发布”staging.advertise”消息),

  subscribe_to_staging_locate(subscribe主题为“staging.locate”的消息,并且respondwithstaging.advertise),

  unsubscribe,

  start_period_staging_advertise(EM.add_periodic_timer)定期发布advertise消息。

  stop_period_staging_advertise(EM.cancel_timer)方法

e. responders/dea_locator.rb

   

  跟staging_locator基本一样,有start,stop方法

  advertise发布的是”dea.advertise”消息

  subscribe,unsubscribe方法,这里预订的是内容为dea.locate的消息,并advertise

  start_periodic_dea_advertise,stop_periodic_dea_advertise

2.3 dea/nats.rb

这是dea用来与nats-server通信的客户端

start方法订阅”healthmanager.start”, ”router.start”,”dea.status”,”dea.#{bootstrap.uuid}.start”,“dea.stop”,”dea.discover该消息cc会发布”,”dea.update”,”dea.find.droplet”,并分别调用bootstrap各种handle方法

  handle_health_manager_start向instance_registry发送heartbeat

  handle_router_start向router_client注册instance_registry中的instance。

  handle_dea_status返回dea状态

  handle_dea_directed_start在接收到cc要求启动app请求时,创建Instance<Task实例,并启动(启动一个droplet)。

  handle_dea_stop关闭instance

  handle_dea_discover先检测资源是否够用,然后delay几秒后,EM.add_timer(dea生成一个hellomsg)

  handle_dea_update将instance新的uri注册到router_client,将原来的uri unregister。通过NATS告诉router,新的app来了.

  handle_dea_find_droplet返回dea生成的response

  handle_droplet_status查看注册的instance中有无starting或者running状态的instance。并且返回这个response.

  create_nats_clients(::NATS.connect)

  class Message包括nats,subject,data,respond_to

  respond方法在nats上发布一个newmsg(调用response生成该msg),

  response方法,self.class.new(nats,respond_to,data,nil)

2.4   directory_server 

2.4.1 dea/directory_server.rb

目录服务器,dea启动时,会启动一个目录服务器,

initialize时,创建Thin::Server(localhost:4385)

map “/ instances” do

  runDirectory.new(instance_registry)

start方法,server.start

2.4.2 dea/directory_server_v2.rb

通过file server验证uri

mountDirectoryServerV2::InstancePaths什么意思还不是很清楚。

mountDirectoryServerV2::StagingTasks

file_api_server = Thin::Server.new(“127.0.0.1:1234”),目录验证服务器

file_api_server.start

hmaced_url_for生成验证(hmac)的url和verify_hmaced_url

staging_task_file_url_for方法和instance_file_url_for方法,调用hmaced_url_for

verify_instance_file_url,和verify_staging_task_file_url调用verify_hmaced_url

HMACHelper

2.4.3 dea/directory.rb

FileServer < Rack::File查找root下的目录,如果找到,以html形式给出。

Directory利用Rack::File提供了一个对root目录的定制访问功能,在dea启动时,让DEA不断地启动droplets的文件访问功能。

2.4.4 go版本directory_server

 新版本的dea,使用go重构directoryserver,替代原先嵌在dea中的directory_server.

go/src/runner/runner.go使用go语言实现directoryserver

函数入口main,调用directoryserver.Start

2.4.5  go/src/direcotryserver.go

Start方法,startdirectory server, validateHTTP request with dea’s  HTTP serverwhich servers requests on the same host and specified DEA port.

这里我将dea/config/dea.xml中的local_router改为8.8.8.8,以前是127.0.0.1与上面的说法一致。后来因为出现dea与cc竞争冒险,参考googlegroups修改的。

以tcp形式监听port端口,获得一个listener

http.Server(listener, handler)

v1_port:4385, v2_port:5678, file_api_port:1234

2.5  buildpack

buildpack是CF v2新出的技巧,参考heroku,用于让CF迅速的支持多个语言框架

2.5.1 buildpack.rb

buildpacks/bin/run,执行buildpack.rb中stage_application

Buildpack < StagingPlugin

stage_application方法

chdir(destination)

create_app_directories

copy_source_files

compile_with_timeout调用build_pack.compile

create_startup_script

save_buildpack_info

build_pack方法返回一个buildpack,如果@buildpack为空,根据custom_url判断,如果url不为空,调用clone_buildpack;否则调用installers.detect

cf push nodejs时,custom_url,因此调用installers.detect

installers方法,为每一个vendor中的buildpack,新建一个Installer

2.5.2 staging_plugin.rb

generate_startup_script生成start脚本

create_start_script

2.5.3  installer.rb

command(name)方法返回”#{path}/bin/#{name}#{app_dir}”

detect方法调用Open3.capture2command(‘detect’)

compile方法,执行System“#{command(‘compile’)} #{cache_dir}”

release_info调用Open3.capture2command(‘release’)

3 参考文献

http://blog.csdn.net/cherry_sun/article/details/7696482
http://blog.csdn.net/tibelf/article/details/12998145 文中的staging图就是该作者原创的
还有一些参考文献记不得了,以后会陆续补上的

抱歉!评论已关闭.