现在的位置: 首页 > 综合 > 正文

一个简单的集群任务调度框架

2013年09月05日 ⁄ 综合 ⁄ 共 7011字 ⁄ 字号 评论关闭

说到服务器集群后台的任务调度,这可能是很多网站或者计算集中型方案经常使用到的。

本文不讨论map/reduce级别的任务拆分和调度,本文设计的调度框架只满足以下几点特性:

1)轻量级,代码框架及实现原理非常简单,容易部署

2)集群可扩展,理论上集群机器数量,以及每台机器上的执行任务数都可扩展

3)业务单元化,业务定义的下发任务是具体的、可颗粒化的,本框架不辅助做任务或工作流的拆分,只接受最细颗粒化的任务

实现原理:

1)所有计算节点(这里指一个程序实例)均地位平等

2)任务以一个文件的形式存在,计算节点通过共享文件系统去“抢”任务。

3)所有的计算节点均永久存在,不断的扫描任务文件

4)业务系统下发任务,即直接生成一个文件

我们将计算节点定义为worker,那么worker的主逻辑如下

While(true){

         If(find(以前未完成的任务文件)||find(任务文件)){

                   将该文件增加扩展名+本机ip.实例号

                   处理任务

                   将任务文件迁移到finish目录

       }

}

以下为python的实现,供参考。

#encoding=utf8
'''
Created on 2011-9-24

@author: chenggong

worker基类
'''
import time
import os
import re
from optparse import OptionParser
import filelocker

class WorkerBase(object):
    def __init__(self):
        self.patten = ".*"
        self.taskexname = ".txt"
    
    def set_task_patten(self,patten):
        self.patten = patten
    
    def set_task_exname(self,exname):
        self.taskexname = "." + exname.replace(".","")
    
    def dowork(self,filename,content):
        pass
    
    def tasklogic(self,filepath):
        with open(filepath,"r") as filehandle:
            filelocker.lock(filehandle,filelocker.LOCK_NB) #try lock the task
            try:
                self.log("normal",0,"开始执行任务%s"%filepath)
                fsname = os.path.basename(filepath).split(".")[0]
                success = self.dowork(fsname,filehandle.read())
            except Exception,e:
                self.log("warning",0,"派生类未捕获异常%s"%str(e))
            
            filelocker.unlock(filehandle)
            
        while True:
            try:
                if success:
                    self.log("normal",0,"任务%s结束,完成成功"%filepath)
                    finishfile = filepath.split(".")[0]+".finish"
                    if os.path.exists(finishfile):
                        self.log("warning",0,"该任务.finish文件已存在,进行覆盖")
                        os.remove(finishfile)
                    os.rename(filepath,finishfile)
                else:
                    self.log("normal",0,"任务%s结束,完成失败"%filepath)
                    errorfile = filepath.split(".")[0]+".error"
                    if os.path.exists(errorfile):
                        self.log("warning",0,"该任务.erorr文件已存在,进行覆盖")
                        os.remove(errorfile)
                    os.rename(filepath,errorfile)
                break
            except Exception,e:
                self.log("error",0,"任务执行完毕后改名失败,文件系统异常或任务文件已被损坏!except=%s"%str(e))
                time.sleep(5)
    
    def start(self):  
        #params
        taskDir = self.options.dir
        uuid = self.options.uuid
        ip = self.options.ip
        #main loop
        while True:
            try:
                for f in os.listdir(taskDir):
                    filepath = os.path.join(taskDir,f)
                    
                    taskname =  os.path.basename(filepath).split(".")[0]
                    
                    try:
                        if(not re.match(self.patten,taskname)):
                            continue
                    except:
                        self.log("fetal",0,"patten=%s,正则表达式格式匹配失败"%self.patten)
                        return
                    
                    fex = os.path.splitext(f)[1]
                    if fex == "."+uuid: #my task
                        self.log("normal",0,"找到未完成任务%s"%str(f))
                        try:
                            self.tasklogic(filepath)
                        except:
                            self.log("warning",0,"尝试锁定该任务失败,该任务可能已被锁定,uuid=%s可能被多次启用!"%uuid)
                            continue
                    elif fex == self.taskexname: #new task
                        try:
                            os.rename(filepath,"%s.%s.%s"%(filepath,ip,uuid))
                            self.tasklogic("%s.%s.%s"%(filepath,ip,uuid))
                        except:
                            self.log("warning",0,"任务文件%s锁定失败,或已被占有"%filepath)
                            continue
            except:
                self.log("error",0,"连接任务文件夹%s失败,可能网络已断开.."%taskDir)
                time.sleep(30)
        
    def log(self,level,typeid,msg):   
        logdir = self.options.log
        if(not os.path.exists(logdir)):
            os.mkdir(logdir)
        filename = time.strftime('%Y-%m-%d',time.localtime(time.time()))+".log"
        t = time.strftime('%H:%M:%S',time.localtime(time.time()))
        filepath = os.path.join(logdir,filename)

        with open(filepath,"a") as f:
            filelocker.lock(f,filelocker.LOCK_EX) #block lock
            logmsg = "[%8s][%s][%s][%d]%s"%(t,self.options.uuid,level,typeid,msg)
            f.write(logmsg+"\n")
            filelocker.unlock(f)
        print logmsg.decode("utf8").encode("gbk")
    
    def set_options(self,options):
        parser = OptionParser()
        for opt in options:
            parser.add_option(opt['option'], dest=opt['value'])
        ##公用
        parser.add_option("-d", dest="dir")
        parser.add_option("-i", dest="ip")
        parser.add_option("-u", dest="uuid")
        parser.add_option("-l", dest="log")
        (self.options, argvs) = parser.parse_args()
    

用到的filelocker跨平台文件锁

#encoding=utf8
# portalocker.py - Cross-platform (posix/nt) API for flock-style file locking.
#                  Requires python 1.5.2 or better.
"""Cross-platform (posix/nt) API for flock-style file locking.

Synopsis:

   import portalocker
   file = open("somefile", "r+")
   portalocker.lock(file, portalocker.LOCK_EX)
   file.seek(12)
   file.write("foo")
   file.close()

If you know what you're doing, you may choose to

   portalocker.unlock(file)

before closing the file, but why?

Methods:

   lock( file, flags )
   unlock( file )

Constants:

   LOCK_EX
   LOCK_SH
   LOCK_NB

Exceptions:

    LockException

Notes:

For the 'nt' platform, this module requires the Python Extensions for Windows.
Be aware that this may not work as expected on Windows 95/98/ME.

History:

I learned the win32 technique for locking files from sample code
provided by John Nielsen <nielsenjf@my-deja.com> in the documentation
that accompanies the win32 modules.

Author: Jonathan Feinberg <jdf@pobox.com>,
        Lowell Alleman <lalleman@mfps.com>
Version: $Id: portalocker.py 5474 2008-05-16 20:53:50Z lowell $

"""


__all__ = [
    "lock",
    "unlock",
    "LOCK_EX",
    "LOCK_SH",
    "LOCK_NB",
    "LockException",
]

import os

class LockException(Exception):
    # Error codes:
    LOCK_FAILED = 1

if os.name == 'nt':
    import win32con
    import win32file
    import pywintypes
    LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
    LOCK_SH = 0 # the default
    LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
    # is there any reason not to reuse the following structure?
    __overlapped = pywintypes.OVERLAPPED()
elif os.name == 'posix':
    import fcntl
    LOCK_EX = fcntl.LOCK_EX
    LOCK_SH = fcntl.LOCK_SH
    LOCK_NB = fcntl.LOCK_NB
else:
    raise RuntimeError, "PortaLocker only defined for nt and posix platforms"

if os.name == 'nt':
    def lock(file, flags):
        hfile = win32file._get_osfhandle(file.fileno())
        try:
            win32file.LockFileEx(hfile, flags, 0, -0x10000, __overlapped)
        except pywintypes.error, exc_value:
            # error: (33, 'LockFileEx', 'The process cannot access the file because another process has locked a portion of the file.')
            if exc_value[0] == 33:
                raise LockException(LockException.LOCK_FAILED, exc_value[2])
            else:
                # Q:  Are there exceptions/codes we should be dealing with here?
                raise
    
    def unlock(file):
        hfile = win32file._get_osfhandle(file.fileno())
        try:
            win32file.UnlockFileEx(hfile, 0, -0x10000, __overlapped)
        except pywintypes.error, exc_value:
            if exc_value[0] == 158:
                # error: (158, 'UnlockFileEx', 'The segment is already unlocked.')
                # To match the 'posix' implementation, silently ignore this error
                pass
            else:
                # Q:  Are there exceptions/codes we should be dealing with here?
                raise

elif os.name == 'posix':
    def lock(file, flags):
        try:
            fcntl.flock(file.fileno(), flags)
        except IOError, exc_value:
            #  IOError: [Errno 11] Resource temporarily unavailable
            if exc_value[0] == 11:
                raise LockException(LockException.LOCK_FAILED, exc_value[1])
            else:
                raise
    
    def unlock(file):
        fcntl.flock(file.fileno(), fcntl.LOCK_UN)



if __name__ == '__main__':
    from time import time, strftime, localtime
    import sys

    log = open('\\\\10.1.10.254\\storage\\log.txt', "a+")
    lock(log, LOCK_EX)

    timestamp = strftime("%m/%d/%Y %H:%M:%S\n", localtime(time()))
    log.write( timestamp )

    print "Wrote lines. Hit enter to release lock."
    dummy = sys.stdin.readline()

    log.close()

workerbase使用样例

#encoding=utf8
'''
Created on 2011-9-24

@author: chenggong

worker例程
'''


from workerbase import WorkerBase
import time

#派生WorkBase
class SampleWorker(WorkerBase):
    
    #实现dowork方法
    # filepath  :任务文件名,
    # filehandle:任务文件内容
    def dowork(self,filepath,content):
        print "dowork file=%s content=%s"%(filepath,content)
        print "doing..."
        
        #由self.options.xxxx可以获取自己设置的参数
        print "myparam=%s %s"%(self.options.test1,self.options.test2) 

        time.sleep(2)
        
        #日志提交方法
        self.log("debug",0,"可以这样提交日志") 
        
        #成功则返回True,失败返回False
        return False 

'''
基本命令行参数,调用至少要有以下几个参数
-d 任务文件夹
-l 日志输出文件夹
-i 本机IP
-u uuid
'''

if __name__ == "__main__":
    #实例化SampleWorker
    sampleworker = SampleWorker()
    
    #设置自己的任务文件匹配方式,若不设置,则默认为全匹配
    #如下,则匹配  xxx-xxx-cut 所有文件
    sampleworker.set_task_patten(".*-.*-cut")
    
    #设置任务文件扩展名,若不设置,则默认为txt
    sampleworker.set_task_exname("txt")

    #设置自己的参数
    sampleworker.set_options([{"option":"-a","value":"test1"},{"option":"-b","value":"test2"}])
    
    #开始主循环
    sampleworker.start()

抱歉!评论已关闭.