队列应用场景,自己实现队列(二)_PHP_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > PHP > 队列应用场景,自己实现队列(二)

队列应用场景,自己实现队列(二)

 2014/6/3 13:11:49  cyber4cn  程序员俱乐部  我要评论(0)
  • 摘要:原文详见:http://www.ucai.cn/blogdetail/7026?mid=1&f=12可以在线运行查看效果哦!五、队列具体实现二:定义调用接口[php]viewplaincopy<?php/***优才网公开课示例代码**任务队列实现**@author优才网全栈工程师教研组张友林*@seehttp://www.ucai.cn*/include_once('db.php');classQueue
  • 标签:实现 应用 自己 队列

原文详见:http://www.ucai.cn/blogdetail/7026?mid=1&f=12

可以在线运行查看效果哦! ? ?

?

?

?

五、队列具体实现二:定义调用接口

?

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
    class="dp-c">
  1. <?php??
  2. /**?
  3. ?*?优才网公开课示例代码?
  4. ?*?
  5. ?*?任务队列实现?
  6. ?*?
  7. ?*?@author?优才网全栈工程师教研组??张友林?
  8. ?*?@see?http://www.ucai.cn?
  9. ?*/??
  10. ??
  11. include_once('db.php');??
  12. class?Queue??
  13. {??
  14. ??
  15. ????/**?
  16. ?????*?把任务扔到队列?
  17. ?????*?
  18. ?????*?@param?string?$taskphp???执行任务的程序?
  19. ?????*?@param?string?$param?????执行任务程序所用的参数?
  20. ?
  21. ?????*?例如,群发消息加入队列:?
  22. ?????*?$arr?=?array(?
  23. ?????*??????"uid"?=>?4,//发信息的人的UID?
  24. ?????*??????"uids"?=>?array(6,234,34,67,7888,2355),?//接收信息的人的UID?
  25. ?????*??????"content"?=>?'xxxxx',//信息内容?
  26. ?????*??);?
  27. ?????*?$cqueue?=?new?Queue();?
  28. ?????*?$cqueue->add("/app/send_msg.php",?serialize($arr));?
  29. ?????*?
  30. ?????*/??
  31. ????public?function?add($taskphp,$param)??
  32. ????{??
  33. ????????$taskphp?=?mysql_real_escape_string($taskphp);??
  34. ????????//$param?=?mysql_real_escape_string($param);??
  35. ????????$param?=?$param;??
  36. ????????$sql?=?"insert?into?queue?(taskphp,?param)?values('".$taskphp."',?'".$param."')";??
  37. ????????$re?=?execute($sql);??
  38. ????????if?($re)??
  39. ????????{??
  40. ????????????$pid?=?mysql_insert_id();??
  41. ????????????return?$pid;??
  42. ????????}??
  43. ????????else??
  44. ????????{??
  45. ????????????return?false;??
  46. ????????}??
  47. ????}??
  48. ??
  49. ??
  50. ????/**?
  51. ?????*?读取任务队列?
  52. ?????*??
  53. ?????*?@param?string?$limit?一次取多少条?
  54. ?????*/??
  55. ?????public?function?getQueueTask($limit?=?1000)??
  56. ?????{??
  57. ????????$limit?=?(int)$limit;??
  58. ????????$sql?=?"select?id,?taskphp,?param?from?queue??where?status?=?0?order?by?id?asc";??
  59. ????????$re?=?query($sql);??
  60. ????????return?$re;??
  61. ?????}??
  62. ??
  63. ????/**?
  64. ?????*?更新任务状态?
  65. ?????*??
  66. ?????*?@param?string?$limit?一次取多少条?
  67. ?????*/??
  68. ?????public?function?updateTaskByID($id)??
  69. ?????{??
  70. ????????$id?=?(int)$id;??
  71. ????????$mtime?=?time();??
  72. ????????$sql?=?"update?queue??set?status?=1,?mtime?=?".$mtime."?where?id?=?".$id;??
  73. ????????$re?=?execute($sql);??
  74. ????????return?$re;??
  75. ?????}??
  76. ??
  77. ??
  78. ?????public?static?function?a2s($arr)??
  79. ????{??
  80. ????????$str?=?"";??
  81. ????????foreach?($arr?as?$key?=>?$value)??
  82. ????????{??
  83. ????????????if?(is_array($value))??
  84. ????????????{??
  85. ????????????????foreach?($value?as?$value2)??
  86. ????????????????{??
  87. ????????????????????$str?.=?urlencode($key)?.?"[]="?.?urlencode($value2)?.?"&";??
  88. ????????????????}??
  89. ????????????}??
  90. ????????????else??
  91. ????????????{??
  92. ????????????????$str?.=?urlencode($key)?.?"="?.?urlencode($value)?.?"&";??
  93. ????????????}??
  94. ????????}??
  95. ????????return?$str;??
  96. ????}??
  97. ??
  98. ????public?static?function?s2a($str)??
  99. ????{??
  100. ????????$arr?=?array();??
  101. ????????parse_str($str,?$arr);??
  102. ????????return?$arr;??
  103. ????}??
  104. ??
  105. }??
  106. ??
  107. ?>??
  108. ??
  109. $cqueue?=?new?Queue();??

?

?

1:加入队列接口

? //$param1 为执行任务的程序,$param2 为程序参数,可以为序列化的数据

? $cqueue->add($param1,$param2);

?

2:? 读取队列接口

? $tasks = $cqueue->getQueueTask($limit = 1000);

?

3:更新任务状态

? $cqueue->updateTaskStatus($id);

?

4:a2s是自定义的一个数组转换字符串方法,这里不要使用json_encode,容易出现问题,同样,从数据库中取出转换为数组的时候,使用s2a方法

? $re = $cqueue->add("sendMsg.php", Queue::a2s($arr));

?

?

六、队列具体实现三:写执行队列的程序

?

根据设计,执行队列的程序文件是?do_queue.php , 它的主要功能是把任务从队列表里取出来,并且在后台执行。

do_queue.php部分代码:

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. $phpcmd?=?exec("which?php");????//查找到php安装位置??
  2. $cqueue?=?new?Queue();??
  3. $tasks?=?$cqueue->getQueueTask(200);??
  4. foreach?($tasks?as?$t)??
  5. {??
  6. ????$taskphp?=?$t['taskphp'];??
  7. ????$param?=?$t['param'];??
  8. ????$job?=?$phpcmd?.?"?"?.?escapeshellarg($taskphp)?.?"?"?.?escapeshellarg($param);??
  9. ????system($job);??
  10. }??

?

七、具体任务的业务实现

还是拿群发消息来做例子,我们需要写好一个群发消息的程序,这个程序接收事先定义好的参数,然后根据参数调用发消息的接口把消息发送出去。

这个一般由做业务功能的工程师实现。但是架构师事先得写文档例子,教会别人使用。

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. send_msg.php:??
  2. ??
  3. $para?=?$argv[1];??
  4. $arr?=?unserialize($para);??
  5. $cmessage?=?new?Message();??
  6. foreach($arr['uids']?as?$touid)??
  7. {??
  8. ????$cmessage->send($arr['uid'],?$touid,?$arr['content']);??
  9. }??


八 、服务器部署一:配置crontab

? ? ? ? 咱们执行队列的程序都写好了, 这个程序怎么触发呢,当然就要用到linux的定时任务,每隔一定的时间,执行do_queue.php一次。但是呢,这里不是直do_queue.php,咱们再提高一层,加个调度程序cron_mission.php, 在cron_mission.php里面调用do_queue.php。


配置定时任务 crontab:

crontab –e

* * * * * ?cd /ucai/schedule; php cron_mission.php >> cron_mission.log


#可以先使用crontab -l查看本机已经使用的定时任务

?

?

九、服务器部署二:写定时任务调度程序

思路:将定时任务写入到任务调度程序cron_mission.php中,这样可以在cron_mission.php中灵活控制队列任务。相比较直接通过crontab控制doQueue.php而言,避免了频繁修改服务器上的crontab,从安全,便于维护等角度来说,都是上策。

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. cron_mission.php?示例:??
  2. ??
  3. if?($minute?%?5?==?0)??
  4. {??
  5. ????if(chdir($site_dir."app/"))?{??
  6. ????????$cmd?=?"$phpcmd?do_queue.php?>?do_queue.log?&";??
  7. ????????echo?'['?,?$ymd?,?'?'?,?$hour?,?':'?,?$minute?,?']?'?,?$cmd?,?"\n";??
  8. ????????system($cmd);??
  9. ????}??
  10. }??

?

十、开启多进程并发执行队列

?

思路:对任务序列进行编号,数据库中执行的时候

where条件加上id%每个队列要执行任务总数=队列编号

这样可以避免重复处理

?

例如:每个进程执行10条任务,修改如下

?

1:定时任务的修改

修改前:

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. if?($minute?%?5?==?0)??
  2. {??
  3. ????if(chdir($site_dir."app/"))?{??
  4. ????????$cmd?=?"$phpcmd?do_queue.php?>?do_queue.log?&";??
  5. ????????echo?'['?,?$ymd?,?'?'?,?$hour?,?':'?,?$minute?,?']?'?,?$cmd?,?"\n";??
  6. ????????system($cmd);??
  7. ????}??
  8. }??

修改后:

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. if?($minute?%?5?==?0)??
  2. {??
  3. ????for?($i=0;?$i?<?10;?$i++)?{???
  4. ????????$cmd?=?"$phpcmd?doQueue.php?10?$i>>?doQueueMission".date('Y-m-d').".log??";??
  5. ????????echo??date("Y-m-d?H:i:s")?.?"\t?:?"?.$cmd."\n";??
  6. ????????system($cmd);??
  7. ????}??
  8. }??

//每次进行10个进程,$i来区分是当前的进程标示

?

?

2:队列执行程序的修改

修改前:

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. $phpcmd?=?'D:\work\wamp\bin\php\php5.3.10\php?';??
  2. $cqueue?=?new?Queue();??
  3. $tasks?=?$cqueue->getQueueTask(200);??

修改后:

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. $phpcmd?=?'D:\work\wamp\bin\php\php5.3.10\php?';??
  2. $total=$argv[1];??
  3. $i=$argb[2];??
  4. $cqueue?=?new?Queue();??
  5. $tasks?=?$cqueue->getQueueTask($total,$i,200);??

?

?

3:取队列接口的修改

修改前:

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. public?function?getQueueTask($limit?=?1000)??
  2. ?????{??
  3. ????????$limit?=?(int)$limit;??
  4. ????????$sql?=?"select?id,?taskphp,?param?from?queue??where?status?=?0?order?by?id?asc";??
  5. ????????$re?=?query($sql);??
  6. ????????return?$re;??
  7. ?????}??


修改后:

?

?

[php]?view plaincopy在CODE上查看代码片派生到我的代码片 ?
  1. public?function?getQueueTask($total,$i,$limit?=?1000)??
  2. ?????{??
  3. ????????$limit?=?(int)$limit;??
  4. ????????$sql?=?"select?id,?taskphp,?param?from?queue??where?status?=?0?and?id%$total=$i?order?by?id?asc";??
  5. ????????$re?=?query($sql);??
  6. ????????return?$re;??
  7. ?????}??

?

?

4:需要关注服务器压力

进程数定为多少,取决于服务器压力

?

十一、实现任务优先级?

1:任务存储表加优先级字段

? ? ? ?在数据表里,加一个优先级字段,按字段值的数值大小来区分优先级。

2:修改取队列任务接口,按优先级取

? ? ? ?同样是在sql语句中增加order by。

十二、记录队列日志

1:关键地方加echo

2:shell脚本的>>和>的各自作用?

总结:

? ? ? ? 我们这里的队列实现借助了服务器的计划任务来实现,例如linux中的crontab,这本身是linux系统中的一个程序,平时我们还可以使用他来进行定时执行.sh脚本,例如将数据库备份打包并ftp传送到指定服务器上,这个功能不需要借助php脚本,直接用.sh脚本就可以实现。在这里我们巧妙的将crontab和php脚本结合,并且使用crontab来不断调用一个队列调度接口cronMission.php,再通过cronMission.php直接来控制具体什么时候或者是满足什么条件来执行什么队列任务。

这里面几个需要注意的地方

? ? ? ?1:往数据库中存取数据时,不要直接使用json_encode或者json_decode,容易造成一些意外问题,在代码中,我们定义了a2s和s2a两个方法,分别是处理数组转为字符串,和从数据库中读取字符串后转为数组。

? ? ? ?2:当任务量比较大,同时服务器负载又没有充分利用的时候,可以使用多进程并发处理,在并发处理的时候需要考虑一个问题,就是如何避免重复,在这里我们使用了,对队列任务进行标记,每次从数据库中读取一个进程需要处理的一批任务,使用数据库中id与批次标示取余等于0的方法来区分,避免不同批次的队列,重复处理相同任务。(上面步骤10中有具体实现)

?

发表评论
用户名: 匿名