-- 前面的队列文章 --
php队列使用php-resque(1)
php队列使用php-resque(2)
php队列使用php-resque(3)- by supervisor
-- 总体说明 --
下面是一个网上的中文的说明
www.fzb.me/2015-7-31-beanstalkd-faq.html
这里分为我们常见的
服务端和客户端。
服务端是使用人很多的beanstalkd,其实叫做消息队列比较合适。
它和我们前面介绍的php-resque差别:
1、php-resque不区分明显的客户端和服务端,beanstalkd区分,较符合我们常用的思维方式。
2、php-resque只需安装redis并运行,而beanstalkd是自己写的服务端与其他软件无关,也不需要。
3、php-resque不能持久化,而beanstalkd可以,但实际不用为好,因为太慢。
4、php-resque和beanstalkd都很轻量级,都很容易使用。
5、最重要的php-resque不能使用延时队列(或者说需要其他组件补充),而beanstalkd可以。
6、php-resque许久不更新了,而beanstalkd在composer上次更新是2015年,好不少。
7、php-resque没有优先级,而beanstalkd有优先级,但我也不用。
8、为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。
介绍一下延时队列,我可以放一个消息,但希望该消息在某个特定时点才真正进入队列,从而被取出。
而普通的队列,你放入消息到队列里,就无法控制它,只要它前面没有别的消息,它就立刻会被取出。
客户端可以选用多种语言,php也有多个类库,但composer仓库下载
最多的是pda/pheanstalk
==============================
服务端说明:
1、一个消息有READY, RESERVED, DELAYED, BURIED四种状态
2、当producer直接put一个消息时,消息就处于READY状态
3、如果选择延迟put,消息就先到DELAYED状态,等待时间过后才迁移到READY状态。任务状态会从 READY 变为 RESERVED(预定),其他人就无法获取。 PUT 产生消息的时候,携带了 ttr(time to run),如果这个时间内,
消费者没有发送 delete, release 或者 buried 命令。 任务会自动回到 READY 状态,其他人可以继续获取。(注意:消费者返回 release 命令或者不返回,就回到 READY/DELAYED 状态,可以重新被消费!!这可能是我们不希望的)
4、如果获取了当前READY的消息后,该消息的状态就迁移到RESERVED,这样其他的任何进程就不能再操作该消息。确保唯一使用。
5、BURIED这个,为防止一个消息被调用多次,可以使用此
接口。
6、程序有一个delete操作,一个消息被delete之后,就完全消失,不属于上面的4种情况中任何一种,并且,一般来说,就应该在获取消息之后对其delete,绝对不可以省略。
Beanstalkd的job状态多样化,支持任务优先级 (priority), 延时 (delay),
超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。
它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯
协议,因此有很高的性能。
关于服务端使用的
内存,只取决于队列大小,不
限制。
缺点:无最大内存控制, 如果有消息堆积或者业务使用方式有误,而导致内存暴涨拖垮机器
-- 服务端安装 --
完整安装步骤,必须是类unix系统。
yum install beanstalkd
service beanstalkd start
上面的写法,会读取下列配置文件。
配置文件 /etc/sysconfig/beanstalkd
还有一种启动方法是直接启动,自己设置参数
当
开启-b启动持久化后,其会将binlog每隔N秒(N可配置为0)刷入到
硬盘,类似redis的aof持久化方式,一般不需要的。
-- 服务端监控 --
有很多
https://github.com/kr/beanstalkd/wiki/Tools
我挑选第一个下载
使用极其简单,无需安装。
只要把文件解压即可,执行命令
./beanstalkd-cli stats
或者
./beanstalkd-cli monitor
但是不太直观。
-- 客户端安装 --
composer安装
"pda/pheanstalk":"3.1.0"
-- 客户端代码 --
为简单,写一个文件里了,真实项目肯定分开
class="php">
<?php
namespace app\test\controller;
use Pheanstalk\Pheanstalk;
/**
* 这是门面程序。
* 使用之前,需要beanstalkd服务已经在本机启动。
* 我是用框架执行的,如果没有框架,需写成两个php文件。
*
* @author Administrator
*
*/
class Beanstalkd
{
/**
* 这是添加消息到队列。
*/
public function add() {
$beanstalkd = new Pheanstalk('127.0.0.1', '11300');
//这是消息数据,在本demo中,type不能省略。区分任务类型。
$data = array(
'type' => mt_rand(1,2),
'mobile' => '13051662435',
'id' => mt_rand(100,200),
'time' => date("Y-m-d H:i:s"),
);
$delay = (int) strtotime($data['time']) - time();
$delay=0;
// 把消息放入队列,1024 是优先级
//$delay 非常重要,假设设置10,则该消息10秒后才被放入队列!非常好使。
$beanstalkd->useTube( Worker::queue_name )
->put(serialize($data), 1024, $delay);
}
/**
* 这是运行队列管理器。
*/
public function run()
{
$worker = new Worker();
$worker->run();
}
}
/**
* 这是真正的任务执行1
* @author Administrator
*
*/
class WorkerJob1
{
public function excute($data)
{
echo "---\n";
echo ('job:type1:'.print_r($data, 1));
echo "---\n";
}
}
/**
* 这是真正的任务执行2
* @author Administrator
*
*/
class WorkerJob2
{
public function excute($data)
{
echo "---\n";
echo ('job:type2:'.print_r($data, 1));
echo "---\n";
}
}
/**
* 这是队列管理器守护进程,最好用supervisord这个软件支持一下。
且应该放在linux的后台执行
* @author Administrator
*
*/
class Worker {
private $job1; //任务对象
private $job2; //这是任务对象
private $pheanstalk;//这是服务
// 设置队列名称。随意起
const queue_name ="queue1";
public function __construct() {
$this->log('starting');
$this->pheanstalk = new Pheanstalk('127.0.0.1:11300');
$this->job1 = new WorkerJob1();
$this->job2 = new WorkerJob2();
}
public function excute($data)
{
if ($data['type']==1) {
$this->job1->excute($data);
}elseif ($data['type']==2) {
$this->job2->excute($data);
}
}
public function run() {
$this->log('starting to run');
while(true) {
$job = $this->pheanstalk->watch( self::queue_name )->ignore('default')->reserve();
$job_data = unserialize( $job->getData());
// 真正工作的就这句。
$this->excute($job_data);
//删除千万不能忘记。
$this->pheanstalk->delete($job);
$memory = memory_get_usage();
//这是一个保险措施。也可以去除。
if($memory > 100000000) {
$this->log('exiting run due to memory limit');
exit;
}
}
}
private function log($txt) {
echo "worker: ".$txt."\n";
}
}
-- 浏览器输出 --
- 大小: 15.5 KB