rabbitmq学习3:Publish/Subscribe_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > rabbitmq学习3:Publish/Subscribe

rabbitmq学习3:Publish/Subscribe

 2013/12/10 12:26:14  yimeng528  程序员俱乐部  我要评论(0)
  • 摘要:在前面的WorkQueue中的消息是均匀分配消息给消费者;如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅)。让我们开始Publish/Subscribe之旅吧!Publish/Subscribe的工作示意图如下:在上图中的X表示Exchange(交换区);Exchange的类型有:direct,topic,headers和fanoutPublish/Subscribe的Exchang的类型为fanout
  • 标签:学习

在前面的Work Queue中的消息是均匀分配消息给消费者;如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅)。让我们开始Publish/Subscribe之旅吧!

Publish/Subscribe的工作示意图如下:


在上图中的X表示Exchange(交换区);Exchange的类型有:class="code ">direct?,?topic?,?headers?和?fanout

Publish/Subscribe的Exchang的类型为fanout;声明Publish/Subscribe的Exchang代码如下:

Java代码??收藏代码
  1. channel.exchangeDeclare("logs",?"fanout");??

?

对于Work Queue中提到的发布消息的代码如下:

Java代码??收藏代码
  1. channel.basicPublish("",?queueName,???null,?message.getBytes());??

?但对于Publish/Subscribe中发布消息中的Queue的使用的是默认的;代码如下:

Java代码??收藏代码
  1. channel.basicPublish(?"logs",?"",?null,?message.getBytes());??

?

Exchange和各Queue之间是如何通信的呢?主要是通过把Exchange和各Queue绑定(binding);示意代码如下:

Java代码??收藏代码
  1. channel.queueBind(queueName,?exchangeName,?"");??

Publish/Subscribe加入绑定的工作示意图如下:


?


那我们就开始程序代码吧:P端的代码如下:

?

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. ??
  7. public?class?EmitLog?{??
  8. ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
  9. ??
  10. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  11. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  12. ????????factory.setHost("localhost");??
  13. ????????Connection?connection?=?factory.newConnection();??
  14. ????????Channel?channel?=?connection.createChannel();??
  15. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");//声明Exchange??
  16. ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
  17. ????????????String?message?=?"hello?word!"?+?i;??
  18. ????????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes());??
  19. ????????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
  20. ????????}??
  21. ????????channel.close();??
  22. ????????connection.close();??
  23. ????}??
  24. ??
  25. }??

?运行结果如下:

Java代码??收藏代码
  1. [x]?Sent?'hello?word!0'??
  2. [x]?Sent?'hello?word!1'??
  3. [x]?Sent?'hello?word!2'??

?

C端的代码如下:

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.QueueingConsumer;??
  7. ??
  8. public?class?ReceiveLogsOne?{??
  9. ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
  10. ??
  11. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  12. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  13. ????????factory.setHost("localhost");??
  14. ????????Connection?connection?=?factory.newConnection();??
  15. ????????Channel?channel?=?connection.createChannel();??
  16. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
  17. ????????String?queueName?=?"log-fb1";??
  18. ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
  19. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?"");//把Queue、Exchange绑定??
  20. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  21. ????????channel.basicConsume(queueName,?true,?consumer);??
  22. ????????while?(true)?{??
  23. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  24. ????????????String?message?=?new?String(delivery.getBody());??
  25. ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
  26. ????????}??
  27. ????}??
  28. }??

?

对于C端的代码我写了二个差不多的程序,只需要修改一下queueName。这样就形成了二个Queue;运行结果相同;

运行结果可能如下:

Java代码??收藏代码
  1. [x]?Received?'hello?word!0'??
  2. [x]?Received?'hello?word!1'??
  3. [x]?Received?'hello?word!2'??

?

?

  • 大小: 5.7 KB
  • 大小: 5.5 KB
  • 大小: 8.5 KB
发表评论
用户名: 匿名