php Memcache中實現消息隊列

對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費。下面是我用PHP 實現的一個消息隊列,只需要在尾部插入一個數據,就操作尾部,不用操作整個消息隊列進行讀取,與操作。但是,這個消息隊列不是線程安全的,我只是儘量的避免了衝突的`可能性。如果消息不是非常的密集,比如幾秒鐘才一個,還是可以考慮這樣使用的。就跟隨本站小編一起去了解下吧,想了解更多相關信息請持續關注我們應屆畢業生考試網!

php Memcache中實現消息隊列

如果你要實現線程安全的,一個建議是通過文件進行鎖定,然後進行操作。下面是代碼

複製代碼 代碼如下:

class Memcache_Queue

{

private $memcache;

private $name;

private $prefix;

function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")

{

if ($memcache == null) {

throw new Exception("memcache object is null, new the object first.");

}

$this->memcache = $memcache;

$this->name = $name;

$this->prefix = $prefix;

$this->maxSize = $maxSize;

$this->front = 0;

$this->real = 0;

$this->size = 0;

}

function __get($name)

{

return $this->get($name);

}

function __set($name, $value)

{

$this->add($name, $value);

return $this;

}

function isEmpty()

{

return $this->size == 0;

}

function isFull()

{

return $this->size == $this->maxSize;

}

function enQueue($data)

{

if ($this->isFull()) {

throw new Exception("Queue is Full");

}

$this->increment("size");

$this->set($this->real, $data);

$this->set("real", ($this->real + 1) % $this->maxSize);

return $this;

}

function deQueue()

{

if ($this->isEmpty()) {

throw new Exception("Queue is Empty");

}

$this->decrement("size");

$this->delete($this->front);

$this->set("front", ($this->front + 1) % $this->maxSize);

return $this;

}

function getTop()

{

return $this->get($this->front);

}

function getAll()

{

return $this->getPage();

}

function getPage($offset = 0, $limit = 0)

{

if ($this->isEmpty() || $this->size < $offset) {

return null;

}

$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);

$num = 1;

for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)

{

$keys[] = $this->getKeyByPos($pos);

$num++;

if ($limit > 0 && $limit == $num) {

break;

}

}

return array_values($this->memcache->get($keys));

}

function makeEmpty()

{

$keys = $this->getAllKeys();

foreach ($keys as $value) {

$this->delete($value);

}

$this->delete("real");

$this->delete("front");

$this->delete("size");

$this->delete("maxSize");

}

private function getAllKeys()

{

if ($this->isEmpty())

{

return array();

}

$keys[] = $this->getKeyByPos($this->front);

for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)

{

$keys[] = $this->getKeyByPos($pos);

}

return $keys;

}

private function add($pos, $data)

{

$this->memcache->add($this->getKeyByPos($pos), $data);

return $this;

}

private function increment($pos)

{

return $this->memcache->increment($this->getKeyByPos($pos));

}

private function decrement($pos)

{

$this->memcache->decrement($this->getKeyByPos($pos));

}

private function set($pos, $data)

{

$this->memcache->set($this->getKeyByPos($pos), $data);

return $this;

}

private function get($pos)

{

return $this->memcache->get($this->getKeyByPos($pos));

}

private function delete($pos)

{

return $this->memcache->delete($this->getKeyByPos($pos));

}

private function getKeyByPos($pos)

{

return $this->prefix . $this->name . $pos;

}

} </p