在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(Work Queues)。
Work Queues的示意图如下:
?
对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过class="n">channel?.?basicQos(1)方法(prefetchCount?=?1?)来设置同一时间每次发给一个消息给一个worker。示意图如下:
?
P端的程序如下:
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.MessageProperties;??
- ??
- public?class?NewTask?{??
- ????private?static?final?String?TASK_QUEUE_NAME?=?"task_queue";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??????????
- ????????channel.queueDeclare(TASK_QUEUE_NAME,?true,?false,?false,?null);??
- ??
- ????????String?message?=?getMessage(argv);??
- ??
- ????????channel.basicPublish("",?TASK_QUEUE_NAME,??
- ????????????????MessageProperties.PERSISTENT_TEXT_PLAIN,?message.getBytes());??
- ????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
- ??
- ????????channel.close();??
- ????????connection.close();??
- ????}??
- ??
- ????private?static?String?getMessage(String[]?strings)?{??
- ????????if?(strings.length?<?1)??
- ????????????return?"Hello?World!";??
- ????????return?joinStrings(strings,?"?");??
- ????}??
- ??
- ????private?static?String?joinStrings(String[]?strings,?String?delimiter)?{??
- ????????int?length?=?strings.length;??
- ????????if?(length?==?0)??
- ????????????return?"";??
- ????????StringBuilder?words?=?new?StringBuilder(strings[0]);??
- ????????for?(int?i?=?1;?i?<?length;?i++)?{??
- ????????????words.append(delimiter).append(strings[i]);??
- ????????}??
- ????????return?words.toString();??
- ????}??
- }??
??? 多次运行此程序并传入的参数分别为“First message?”,“Secondmessage?”,“Third?message?”,“Fourth message?”,“Fifth message?”
?
C端的程序如下:
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?Worker?{??
- ????private?static?final?String?TASK_QUEUE_NAME?=?"task_queue";??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??????????
- ????????channel.queueDeclare(TASK_QUEUE_NAME,?true,?false,?false,?null);??
- ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");??
- ??
- ????????channel.basicQos(1);??
- ?????????
- ?
- ?
- ??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ?????????
- ?
- ??
- ????????channel.basicConsume(TASK_QUEUE_NAME,?false,?consumer);??
- ??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ??
- ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
- ????????????doWork(message);??
- ????????????System.out.println("?[x]?Done");??
- ??
- ????????????channel.basicAck(delivery.getEnvelope().getDeliveryTag(),?false);??
- ????????}??
- ????}??
- ??
- ????private?static?void?doWork(String?task)?throws?InterruptedException?{??
- ????????for?(char?ch?:?task.toCharArray())?{??
- ????????????if?(ch?==?'.')??
- ????????????????Thread.sleep(1000);??
- ????????}??
- ????}??
- }??
? 开启两个worker分别运行。运行结果如:
c1的结果:
Java代码??
- [*]?Waiting?for?messages.?To?exit?press?CTRL+C??
- ?[x]?Received?'First?message'??
- ?[x]?Received?'Third?message'??
- ?[x]?Received?'Fifth?message'??
?c2的结果
Java代码??
- [*]?Waiting?for?messages.?To?exit?press?CTRL+C??
- ?[x]?Received?'Second?message'??
- ?[x]?Received?'Fourth?message'??
?
?