在前面的Work Queue中的消息是均匀分配消息给消费者;如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅)。让我们开始Publish/Subscribe之旅吧!
Publish/Subscribe的工作示意图如下:
在上图中的X表示Exchange(交换区);Exchange的类型有:class="code ">direct?,?topic?,?headers?和?fanout
Publish/Subscribe的Exchang的类型为fanout;声明Publish/Subscribe的Exchang代码如下:
Java代码??
- channel.exchangeDeclare("logs",?"fanout");??
?
对于Work Queue中提到的发布消息的代码如下:
Java代码??
- channel.basicPublish("",?queueName,???null,?message.getBytes());??
?但对于Publish/Subscribe中发布消息中的Queue的使用的是默认的;代码如下:
Java代码??
- channel.basicPublish(?"logs",?"",?null,?message.getBytes());??
?
Exchange和各Queue之间是如何通信的呢?主要是通过把Exchange和各Queue绑定(binding);示意代码如下:
Java代码??
- channel.queueBind(queueName,?exchangeName,?"");??
Publish/Subscribe加入绑定的工作示意图如下:
?
那我们就开始程序代码吧:P端的代码如下:
?
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- ??
- public?class?EmitLog?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
- ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
- ????????????String?message?=?"hello?word!"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
- ????????}??
- ????????channel.close();??
- ????????connection.close();??
- ????}??
- ??
- }??
?运行结果如下:
Java代码??
- [x]?Sent?'hello?word!0'??
- [x]?Sent?'hello?word!1'??
- [x]?Sent?'hello?word!2'??
?
C端的代码如下:
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?ReceiveLogsOne?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
- ????????String?queueName?=?"log-fb1";??
- ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?"");??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ????????channel.basicConsume(queueName,?true,?consumer);??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
- ????????}??
- ????}??
- }??
?
对于C端的代码我写了二个差不多的程序,只需要修改一下queueName。这样就形成了二个Queue;运行结果相同;
运行结果可能如下:
Java代码??
- [x]?Received?'hello?word!0'??
- [x]?Received?'hello?word!1'??
- [x]?Received?'hello?word!2'??
?
?
- 大小: 5.7 KB
- 大小: 5.5 KB
- 大小: 8.5 KB