现在的位置: 首页 > 综合 > 正文

Memcache 中实现消息队列

2012年05月23日 ⁄ 综合 ⁄ 共 3824字 ⁄ 字号 评论关闭

 

    Memcache 一般用于缓存服务。但是很多时候,比如一个消息广播系统,需要一个消息队列。直接从数据库取消息,负载往往不行。如果将整个消息队列用一个key缓存到memcache里面,

对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。

    如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:

 

class Memcache_Queue
{
    
private $memcache;

    private $name;

    private $prefix;

    function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
    {
        
if ($memcache == null) {
            
throw new Exception("memcache object is null, new the object first.");
        }
        
$this->memcache = $memcache;
        
$this->name = $name;
        
$this->prefix = $prefix;

        $this->maxSize = $maxSize;
        
$this->front = 0;
        
$this->real = 0;
        
$this->size = 0;
    }

    function __get($name)
    {
        
return $this->get($name);
    }

    function __set($name, $value
    {
        
$this->add($name, $value);
        
return $this;
    }

    function isEmpty() 
    {
        
return $this->size == 0;
    }

    function isFull()
    {
        
return $this->size == $this->maxSize;
    }

    function enQueue($data)
    {
        
if ($this->isFull()) {
            
throw new Exception("Queue is Full");
        }
        
$this->increment("size");
        
$this->set($this->real, $data);
        
$this->set("real", ($this->real + 1% $this->maxSize);
        
return $this;
    }

    function deQueue()
    {
        
if ($this->isEmpty()) {
            
throw new Exception("Queue is Empty");
        }
        
$this->decrement("size");
        
$this->delete($this->front);
        
$this->set("front", ($this->front + 1% $this->maxSize);
        
return $this;
    }

    function getTop()
    {
        
return $this->get($this->front);
    }

    function getAll()
    {
        
return $this->getPage();
    }

    function getPage($offset = 0, $limit = 0)
    {
        
if ($this->isEmpty() || $this->size < $offset) {
            
return null;
        }
        
$keys[] = $this->getKeyByPos(($this->front + $offset% $this->maxSize);
        
$num = 1;
        
for ($pos = ($this->front + $offset + 1% $this->maxSize; $pos != $this->real; $pos = ($pos + 1% $this->maxSize) 
        {
             
$keys[] = $this->getKeyByPos($pos);
             
$num++;
             
if ($limit > 0 && $limit == $num) {
                 
break;
             }
        }
        
return array_values($this->memcache->get($keys));
    }

    function makeEmpty()
    {
        
$keys = $this->getAllKeys();
        
foreach ($keys as $value) {
            
$this->delete($value);
        }
        
$this->delete("real");
        
$this->delete("front");
        
$this->delete("size");
        
$this->delete("maxSize");
    }

    private function getAllKeys()
    {
        
if ($this->isEmpty()) 
        {
            
return array();
        }
        
$keys[] = $this->getKeyByPos($this->front);
        
for ($pos = ($this->front + 1% $this->maxSize; $pos != $this->real; $pos = ($pos + 1% $this->maxSize) 
        {
            
$keys[] = $this->getKeyByPos($pos);
        }
        
return $keys;
    }

    private function add($pos, $data
    {
        
$this->memcache->add($this->getKeyByPos($pos), $data);
        
return $this;
    }

    private function increment($pos)
    {
        
return $this->memcache->increment($this->getKeyByPos($pos));
    }

    private function decrement($pos
    {
        
$this->memcache->decrement($this->getKeyByPos($pos));
    }

    private function set($pos, $data
    {
        
$this->memcache->set($this->getKeyByPos($pos), $data);
        
return $this;
    }

    private function get($pos)
    {
        
return $this->memcache->get($this->getKeyByPos($pos));
    }

    private function delete($pos)
    {
        
return $this->memcache->delete($this->getKeyByPos($pos));
    }

    private function getKeyByPos($pos)
    {
        
return $this->prefix . $this->name . $pos;
    }
}

抱歉!评论已关闭.