rabbitmq学习2:Work Queues_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > rabbitmq学习2:Work Queues

rabbitmq学习2:Work Queues

 2013/12/10 12:26:10  yimeng528  程序员俱乐部  我要评论(0)
  • 摘要:在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(WorkQueues)。WorkQueues的示意图如下:对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过channel.basicQos(1)方法(prefetchCount=1
  • 标签:学习 学习2

在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(Work Queues)。
Work Queues的示意图如下:

?

对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过class="n">channel?.?basicQos(1)方法(prefetchCount?=?1?)来设置同一时间每次发给一个消息给一个worker。示意图如下:

?

P端的程序如下:

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.MessageProperties;??
  7. ??
  8. public?class?NewTask?{??
  9. ????private?static?final?String?TASK_QUEUE_NAME?=?"task_queue";??
  10. ??
  11. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  12. ??
  13. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  14. ????????factory.setHost("localhost");??
  15. ????????Connection?connection?=?factory.newConnection();??
  16. ????????Channel?channel?=?connection.createChannel();??
  17. ????????//声明此队列并且持久化??
  18. ????????channel.queueDeclare(TASK_QUEUE_NAME,?true,?false,?false,?null);??
  19. ??
  20. ????????String?message?=?getMessage(argv);??
  21. ??
  22. ????????channel.basicPublish("",?TASK_QUEUE_NAME,??
  23. ????????????????MessageProperties.PERSISTENT_TEXT_PLAIN,?message.getBytes());//持久化消息??
  24. ????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
  25. ??
  26. ????????channel.close();??
  27. ????????connection.close();??
  28. ????}??
  29. ??
  30. ????private?static?String?getMessage(String[]?strings)?{??
  31. ????????if?(strings.length?<?1)??
  32. ????????????return?"Hello?World!";??
  33. ????????return?joinStrings(strings,?"?");??
  34. ????}??
  35. ??
  36. ????private?static?String?joinStrings(String[]?strings,?String?delimiter)?{??
  37. ????????int?length?=?strings.length;??
  38. ????????if?(length?==?0)??
  39. ????????????return?"";??
  40. ????????StringBuilder?words?=?new?StringBuilder(strings[0]);??
  41. ????????for?(int?i?=?1;?i?<?length;?i++)?{??
  42. ????????????words.append(delimiter).append(strings[i]);??
  43. ????????}??
  44. ????????return?words.toString();??
  45. ????}??
  46. }??

??? 多次运行此程序并传入的参数分别为“First message?”,“Secondmessage?”,“Third?message?”,“Fourth message?”,“Fifth message?”

?

C端的程序如下:

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.QueueingConsumer;??
  7. ??
  8. public?class?Worker?{??
  9. ????private?static?final?String?TASK_QUEUE_NAME?=?"task_queue";??
  10. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  11. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  12. ????????factory.setHost("localhost");??
  13. ????????Connection?connection?=?factory.newConnection();??
  14. ????????Channel?channel?=?connection.createChannel();??
  15. ????????//声明此队列并且持久化??
  16. ????????channel.queueDeclare(TASK_QUEUE_NAME,?true,?false,?false,?null);??
  17. ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");??
  18. ??
  19. ????????channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者??
  20. ????????/*?We're?about?to?tell?the?server?to?deliver?us?the?messages?from?the?queue.??
  21. ?????????*?Since?it?will?push?us?messages?asynchronously,??
  22. ?????????*?we?provide?a?callback?in?the?form?of?an?object?that?will?buffer?the?messages??
  23. ?????????*?until?we're?ready?to?use?them.?That?is?what?QueueingConsumer?does.*/??
  24. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  25. ????????/*?
  26. ??????????把名字为TASK_QUEUE_NAME的Channel的值回调给QueueingConsumer,即使一个worker在处理消息的过程中停止了,这个消息也不会失效?
  27. ????????*/??
  28. ????????channel.basicConsume(TASK_QUEUE_NAME,?false,?consumer);??
  29. ??
  30. ????????while?(true)?{??
  31. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();//得到消息传输信息??
  32. ????????????String?message?=?new?String(delivery.getBody());??
  33. ??
  34. ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
  35. ????????????doWork(message);??
  36. ????????????System.out.println("?[x]?Done");??
  37. ??
  38. ????????????channel.basicAck(delivery.getEnvelope().getDeliveryTag(),?false);//下一个消息??
  39. ????????}??
  40. ????}??
  41. ??
  42. ????private?static?void?doWork(String?task)?throws?InterruptedException?{??
  43. ????????for?(char?ch?:?task.toCharArray())?{??
  44. ????????????if?(ch?==?'.')??
  45. ????????????????Thread.sleep(1000);//这里是假装我们很忙??
  46. ????????}??
  47. ????}??
  48. }??

? 开启两个worker分别运行。运行结果如:

c1的结果:

Java代码??收藏代码
  1. [*]?Waiting?for?messages.?To?exit?press?CTRL+C??
  2. ?[x]?Received?'First?message'??
  3. ?[x]?Received?'Third?message'??
  4. ?[x]?Received?'Fifth?message'??

?c2的结果

Java代码??收藏代码
  1. [*]?Waiting?for?messages.?To?exit?press?CTRL+C??
  2. ?[x]?Received?'Second?message'??
  3. ?[x]?Received?'Fourth?message'??

?

?

发表评论
用户名: 匿名