在前面的《rabbitmq学习4:Routing?》中使用一般的名字的路由,现在想通过一些路由规则让消费者来接受符合规则的消息?那应当怎么样呢?那就要用到类型为topic的Exchange了。
Topics的工作示意图如下:
我们可能从图中看到有*和#两个通配符。*表示通配一个词;#表示通配0个或多个词。
下面让我们来看看Topics的程序如何实现的吧!
P端的程序如下 :
?
Java代码??
class="star">
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- ??
- public?class?EmitLogTopic?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");??
- ??
- ????????String?routingKeyOne?=?"logs.error.one";??
- ????????for?(int?i?=?0;?i?<=?1;?i++)?{??
- ????????????String?messageOne?=?"this?is?one?error?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyOne,?null,?messageOne??
- ????????????????????.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyOne?+?"':'"??
- ????????????????????+?messageOne?+?"'");??
- ????????}??
- ??
- ????????System.out.println("################################");??
- ????????String?routingKeyTwo?=?"logs.error.two";??
- ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
- ????????????String?messageTwo?=?"this?is?two?error?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyTwo,?null,?messageTwo??
- ????????????????????.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyTwo?+?"':'"??
- ????????????????????+?messageTwo?+?"'");??
- ????????}??
- ??
- ????????System.out.println("################################");??
- ????????String?routingKeyThree?=?"logs.info.one";??
- ????????for?(int?i?=?0;?i?<=?3;?i++)?{??
- ????????????String?messageThree?=?"this?is?one?info?logs:"?+?i;??
- ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyThree,?null,??
- ????????????????????messageThree.getBytes());??
- ????????????System.out.println("?[x]?Sent?'"?+?routingKeyThree?+?"':'"??
- ????????????????????+?messageThree?+?"'");??
- ????????}??
- ??
- ????????channel.close();??
- ????????connection.close();??
- ????}??
- }??
?
运行结果可能如下:
Java代码??
- ?[x]?Sent?'logs.error.one':'this?is?one?error?logs:0'??
- ?[x]?Sent?'logs.error.one':'this?is?one?error?logs:1'??
- ################################??
- ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:0'??
- ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:1'??
- ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:2'??
- ################################??
- ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:0'??
- ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:1'??
- ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:2'??
- ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:3'??
?
第一个C端的代码如下:
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?ReceiveLogsTopic?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");??
- ??
- ????????String?queueName?=?"queue_topic_logs1";??
- ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
- ??
- ??
- ????????String?routingKeyTwo?=?"logs.*.one";??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyTwo);??
- ??
- ????????System.out.println("?[*]?Waiting?for?messages.");??
- ??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ????????channel.basicConsume(queueName,?true,?consumer);??
- ??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
- ??
- ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
- ????????????????????+?"'");??
- ????????}??
- ????}??
- }??
?
第一个C端的运行结果如下:
Java代码??
- [*]?Waiting?for?messages.??
- [x]?Received?'logs.error.one':'this?is?one?error?logs:0'??
- [x]?Received?'logs.error.one':'this?is?one?error?logs:1'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:0'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:1'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:2'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:3'??
?
第二个C端的程序如下:?
Java代码??
- package?com.abin.rabbitmq;??
- ??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?ReceiveLogsTopicTwo?{??
- ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";??
- ??
- ????public?static?void?main(String[]?argv)?throws?Exception?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");??
- ??
- ????????String?queueName?=?"queue_topic_logs2";??
- ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
- ????????String?routingKeyOne?=?"logs.#";??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyOne);??
- ??
- ????????System.out.println("?[*]?Waiting?for?messages.");??
- ??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ????????channel.basicConsume(queueName,?true,?consumer);??
- ??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
- ??
- ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
- ????????????????????+?"'");??
- ????????}??
- ????}??
- }??
?
第二个C端的运行结果如下:
Java代码??
- [*]?Waiting?for?messages.??
- [x]?Received?'logs.error.one':'this?is?one?error?logs:0'??
- [x]?Received?'logs.error.one':'this?is?one?error?logs:1'??
- [x]?Received?'logs.error.two':'this?is?two?error?logs:0'??
- [x]?Received?'logs.error.two':'this?is?two?error?logs:1'??
- [x]?Received?'logs.error.two':'this?is?two?error?logs:2'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:0'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:1'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:2'??
- [x]?Received?'logs.info.one':'this?is?one?info?logs:3'??
?