RibbitMQ php扩展使用 实现队列生产消费_PHP_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > PHP > RibbitMQ php扩展使用 实现队列生产消费

RibbitMQ php扩展使用 实现队列生产消费

 2017/12/20 16:56:23  18237103303  程序员俱乐部  我要评论(0)
  • 摘要:一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器http://www.rabbitmq.com/download.html默认的端口是55672访问地址http://127.0.0.1:55672/默认帐号密码guestguest你可以看到rabbitmq的管理界面mq的任务是一个不浪费资源,的一个队列系统!php使用需要下载一个amqp扩展,或者直接点击下面的地址找到适合自己的版本,下载http
  • 标签:PHP 实现 使用 队列

一般的队列系统,是指linux中的crontab定时启动脚本来处理任务:

首先下载一个rabbitmq的客户端,他相当于一个容器,装排队数据的容器

http://www.rabbitmq.com/download.html

默认的端口是55672?? 访问地址http://127.0.0.1:55672/

默认帐号密码?? guest??? guest

你可以看到rabbitmq 的管理界面

mq的任务是一个不浪费资源,的一个队列系统!

?

php使用需要下载一个amqp扩展,或者直接点击下面的地址找到适合自己的版本,下载

? ? ? ? ?http://pecl.php.net/package/amqp/1.4.0/windows

根据当前使用的php版本选择相应的扩展dll,下载后是一个压缩包,里面有两个dll扩展(php_amqp.dll和rabbitmq.1.dll)。

php_amqpclass="lazy aligncenter size-full wp-image-15519" style="max-width: 900px;" width="681" height="229">

队列开启的过程:

rabbitmq.1.dll?? 放在C盘windows下

php_amqp.dll??? 放入php扩展中

开启php_amqp.dll的引用

详细步骤如下:

1.将rabbitmq.1.dll文件放在php的根目录里(也就是ext目录的父级目录),然后修改apache的httpd.con文件,文件尾部添加一行,这里的路径根据情况修改,我这里使用的wampserver软件。

?

1 monospace !important; font-size: 12px !important; white-space: pre-wrap; border-radius: 0px !important; background: none !important; border: 0px !important; float: none !important; height: auto !important; line-height: 1.8em !important; margin: 0px !important; overflow: visible !important; padding: 0px !important; vertical-align: baseline !important; width: auto !important; color: #000000 !important;">LoadFile? "d:/wamp/bin/php/php5.5.12/rabbitmq.1.dll"

?

? ? ? ? 2.将php_amqp.dll放在php的ext目录里,然后修改php.ini文件,在文件的最后面添加两行

?

1 2 [amqp] extension=php_amqp.dll

?

3.重启apache,并查看phpinfo信息。只要看到amqp 字样即可。

amqp

?

?

?

?

首先是rabbitmq的生产者:

生产者的逻辑:创建连接-->创建channel-->创建交换机对象-->发送消息

?

创建第一个index文件:然后去mq中查看,如果添加一个test001的队列名信息,就说明已经添加进去了,xx22的信息已经在mq中存储!

????接下来就需要跑数据了。

????createQueue(array('xxx','2222'),'test001');

????echo "ok";

?????function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')

????{

????????$queueName = self::getQueueName($queueName);

????????$conn_args = array('host' =>'localhost', 'port'=> '5672',

????????????'login' =>'guest',??????? //mq帐号

????????????'password'=> '',??????? //mq密码

?????????????'vhost' => '/');

????????$conn = new AMQPConnection($conn_args);

????????$conn->connect();

????????$channel = new AMQPChannel($conn);

????????if (!$exchangeName) {

????????????$exchangeName = $queueName;

????????}

????????$queueName = $queueName;

????????if (!$queueKey) {

????????????$queueKey = $queueName;

????????}

????????$ex = new AMQPExchange($channel);

????????$ex->setName($exchangeName);

????????$ex->setType(AMQP_EX_TYPE_TOPIC);

????????$ex->setFlags(AMQP_DURABLE); //exchange持久化

????????$ex->declareExchange();

????????$q = new AMQPQueue($channel);

????????$q->setName($queueName);

????????$q->setFlags(AMQP_DURABLE); //queue持久化

????????$q->declareQueue();

????????$q->bind($exchangeName, $queueKey);

????????$channel->startTransaction();

????????/**

?????????* 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。

?????????* 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php

?????????*/

????????$result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));

????????$channel->commitTransaction();

????????$conn->disconnect();

????}??

?

有了生产者,那就有消费者

脚本如果没有其他的修改或问题,基本上都是常年启动的:

消费者逻辑:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

?

消费者基类:

????????class WorkerCommand{

????????function qInit($q_name,$e_name='',$k_route=''){

????????????????$q_name = Utils::getQueueName($q_name);

????????????????$conn_args = array(

????????????????????'host' => '127.0.0.1',??????????? //mq的配置

????????????????????'port' => '5672',

????????????????????'login' => 'guest',

????????????????????'password' => 'huoxingxing',

????????????????????'vhost' => '/'

????????????????);

??????????????

??????????

????????//创建连接和channel

????????????????$conn = new AMQPConnection($conn_args);

????????????????if (!$conn->connect()) {

????????????????????die("Cannot connect to the broker!\n");

????????????????}

????????????????$channel = new AMQPChannel($conn);

????????//创建交换机

????????????????$ex = new AMQPExchange($channel);

????????????????if (!$e_name) {

????????????????????$e_name = $q_name;

????????????????}

????????????????$ex->setName($e_name);

????????????????$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型

????????????????$ex->setFlags(AMQP_DURABLE); //持久化

???????????????// echo "Exchange Status:" . $ex->declareExchange() . "\n";

????????//创建队列

????????????????$q = new AMQPQueue($channel);

????????????????$q->setName($q_name);

????????????????$q->setFlags(AMQP_DURABLE); //持久化

???????????????// echo "Message Total:" . $q->declareExchange() . "\n";

????????????????if (!$k_route) {

????????????????????$k_route = $q_name;

????????????????}

????????//绑定交换机与队列,并指定路由键

???????????????// echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\n";

????????//阻塞模式接收消息

????????????????echo "Message:\n";

????????????????while (True) {

????????????????????$q->consume(array($this,'processMessage'));

????????????????????//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答

????????????????}

????????????????$conn->disconnect();

????????}

}????

?

消费者:

class WorkerWareSyncBackUpCommand extends WorkerCommand {

????function actionIndex()

????{

????????$this->qInit('SyncWareBackup');

????}

????function processMessage($envelope, $queue)

????{

????????$msg = json_decode($envelope->getBody());

????????Utils::doBackUp('back',$msg,'');

????????$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

????}

}

发表评论
用户名: 匿名