现在的位置: 首页 > 综合 > 正文

php多线程.

2013年12月02日 ⁄ 综合 ⁄ 共 6612字 ⁄ 字号 评论关闭
代码实现了一个简单的多进程管理机制,比向WEB服务器发送多个请求要实现多进程要方便很多。只能使用在cli模式。可以用在特殊场合,如邮件发送任务等。
资源的共享访问使用了文件锁,并不是很可靠,主要是为了能够在Windwos下使用,如果确实有必要可以考虑自己改用相应的信号灯机制(这个扩展只能用于xUNIX)。

实例
复制PHP内容到剪贴板
PHP代码:
define('DIR_PHP_EXEC', 'php');
define('DIR_MAIN_EXEC', __FILE__);
define('DIR_TMP', '/tmp');
require_once('my_process.php');

class pp extends my_process_base {
    public function run($param = null) {
        for ($i = 0; $i < 4; $i++) {
            echo "111 $param/n";
            sleep(1);
        }
    }
}

init_my_process();
$obj = $GLOBALS['gal_obj_process_m'];
if ($obj->is_main()) {
    $obj->run_task('pp', 'a');
    $obj->run_task('pp', 'b');
    $obj->run_task('pp', 'c');
    $obj->run_task('pp', 'd');
    //$obj->run_task('pp', 'b');
    $obj->set_max_run(10);
    $obj->run();
}

进程管理类
复制PHP内容到剪贴板
PHP代码:
/**
 * @copyright 2007 movivi
 * @author  徐智  <[email=xzfred@gmail.com]xzfred@gmail.com[/email]>
 *
 * $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
 */
if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC', 'php');
//if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');
if (!defined('DIR_TMP')) define('DIR_TMP', '');
/*****************************************************************************/
/* 初始化 */
define('CMD_MAIN_PROCESS_KEY', 'main_process_key');
define('CMD_CHILD_PROCESS_NAME', 'child_process_name');
define('CMD_CHILD_PROCESS_PARAM', 'child_process_param');

function init_my_process() {
    $GLOBALS['gal_obj_cmd'] = new my_cmd_argv();
    $key = $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY);
    $key = $key === false ? '' : $key;
    $GLOBALS['gal_obj_process_m'] = new my_process_m($key);
    if (!$GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run() ;
}

/**
 * php多进程类
 *
 * 你需要从这个对象继承,然后实现你自己的run处理
 */
abstract class my_process_base {
    public function __construct($auto_run=true, $name='') {
    }

    public function __destruct() {
        echo "@end/n";
    }

    abstract public function run($param = null);
}

class my_cmd_argv {
    private $cmd_argv = array();
    public function __construct() {
        $argv = $_SERVER['argv'];
        for ($i = 1; $i < count($argv); $i++) {
            $cmd = explode('=', $argv[$i]);
            $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : '';
        }
    }

    public function get_key($key) {
        return isset($this->cmd_argv[$key]);
    }

    public function get_value($key) {
        return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;
    }
}

/**
 * php多进程管理类
 * 可以在PHP中实现多进程处理,只限在控制台方式使用
 * 当前的信号实现机制采用文件方式
 *
 */
class my_process_m {
    /**
     * @var array $task_list
     * 进程列表
     */
    private $task_list = array();
    private $lock_list = array();
    private $lock = null;
    private $is_main = false;
    private $max_run = 3600000;

    private function release_lock($key = null) {
        $lock = &$this->lock_list;
        if (!is_null($key)) {
            $key = md5($this->build_lock_id($key));
            if (isset($lock[$key])) {
                if (is_resource($lock[$key][0])) fclose($lock[$key][0]);
                unlink($lock[$key][1]);
                unset($lock[$key]);
            }
            return true;
        }

        foreach ($lock as $k => $h) {
            if (is_resource($h)) fclose($h);
            unset($lock[$k]);
        }
        return true;
    }

    private function release_task($key = null) {
        $task = &$this->task_list;
        if (!is_null($key) && isset($task[$key])) {
            if (is_resource($task[$key])) pclose($task[$key]);
            unset($task[$key]);
        } else {
            foreach ($task as $k => $h) {
                if (is_resource($h)) pclose($h);
                unset($task[$k]);
            }
        }
        return true;
    }
           
    private function build_lock_id($key) {
        return DIR_TMP . DIRECTORY_SEPARATOR . $key . '_sem.lock';
    }

    protected function run_child_process() {
        $class = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME);
        $param = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM);
        $param = $param == '' ? null : unserialize(base64_decode(trim($param)));
        $obj = new $class();
        $obj->run($param);
        $this->task_list[] = $obj;
    }

    public function __construct($lock='') {
        if ($lock === '') {
            $this->is_main = true;
            $key = md5(uniqid()) . '_main.my_process';
            $lock = array($key, $this->get($key));
        } else {
            $this->is_main = false;
            $lock = array($lock, 0);
        }
        $this->lock = $lock;
    }

    public function __destruct() {
        $this->release_lock();
        $this->release_task();
    }

    /**
     * 停止所有进程
     *
     */
    public function stop_all() {
    }

    /**
     * 是否是主进程
     *
     */
    public function is_main() {
        return $this->is_main;
    }

    /**
     * 是不是已经存在一个活动信号
     *
     * @param   string      $key
     * @return  bool       
     */
    public function exist($key) {
        return file_exists($this->build_lock_id($key));
    }

    /**
     * 获取一个信号
     *
     * @param   string      $key    
     * @param   int         $max_acquire    最大请求阻塞数量
     * @return mix 如果成功返回一个信号ID
     *
     */
    public function get($key, $max_acquire=5) {
        $fn = $this->build_lock_id($key);
        if (isset($this->lock_list[md5($fn)])) return false;
        $id = fopen($fn, 'a+');
        if ($id) $this->lock_list[md5($fn)] = array($id, $fn);
        return $id;
    }

    /**
     * 释放一个信号
     *
     * @param   string      $key    
     * @return  bool        如果成功返回一个信号true
     *
     */
    public function remove($key) {
        return $this->release_lock($key);
    }

    /**
     * 获取一个信号
     *
     * @param string    $id         信号ID
     * @param bool      $block      是否阻塞
     */
    public function acquire($id, $block=false) {
        if ($block) {
            return flock($id, LOCK_EX);
        } else {
            return flock($id, LOCK_EX + LOCK_NB);
        }
    }

    /**
     * 释放一个信号
     *
     */
    public function release($id) {
        flock($id, LOCK_UN);
    }
    public function run_task($process_name, $param=null) {
        $this->task_list[] = popen(DIR_PHP_EXEC . ' -f ' . DIR_MAIN_EXEC . ' -- '
            . CMD_CHILD_PROCESS_NAME . '=' . $process_name . ' '
            . CMD_CHILD_PROCESS_PARAM . '="' . base64_encode(serialize($param)) . '" '
            . CMD_MAIN_PROCESS_KEY . '="' . $this->lock[0] . '" ',
            'r');
    }

    public function run($auto_run = true) {
        if ($this->is_main) {
            $ps = &$this->task_list;
            $max_run = &$this->max_run;
            $id = 0;
            do {
                //echo "process----------------------------------------: /n";
                $c = 0;
                foreach ($ps as $k => $h) {
                    $c++;
                    $msg = fread($h, 8000);
                    if (substr($msg, -5, 4) === '@end') {
                        echo "end process:[$k][$id] echo /n{$msg} /n";
                        $this->release_task($k);
                    } else {
                        echo "process:[$k][$id] echo /n{$msg} /n";
                    }
                }
                sleep(1);
            } while ($auto_run && $id++ < $max_run && $c > 0);
        } else {
            $this->run_child_process();
        }
    }

    public function set_max_run($max=1000) {
        $this->max_run = $max;
    }
}
 

抱歉!评论已关闭.