实际上在AQMP(高级消息队列协议)中,共有direct、fanout、topic、headers 四种不同类型的交换机。
topic交换机 称为主题交换机,能够让消费者以通配符的方式订阅相关主题。topic交换机上的路由key通常是由点号隔开的一系列的标识符。
* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。
如 "audit.#"能够匹配到"audit.irs.corporate",但是“audit.*"只会匹配到"audit.irs"。
示例代码如下。
消息生产者:
package com.tingcream.www.test.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * topic 生产者 * 这种应该属于模糊匹配 * (星号)可以替代一个词 # (井号)可以替代0或者更多的词 */ public class TopicSender { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Connection connection = null; Channel channel = null; try{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("zhangsan"); factory.setPassword("123456"); connection=factory.newConnection(); channel=connection.createChannel(); //声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //待发送的消息 String[] routingKeys=new String[]{ "quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit" }; //发送消息 for(String routingKey :routingKeys){ String message = "来自 "+routingKey+" routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("TopicSender 发送消息 " + routingKey + ":" + message ); } }catch (Exception e){ e.printStackTrace(); if (connection!=null){ channel.close(); connection.close(); } }finally { if (connection!=null){ channel.close(); connection.close(); } } } }
消息消费者A
package com.tingcream.www.test.rabbitmq.topic; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class TopicReceiverA { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); // factory.setPort(AMQP.PROTOCOL.PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); //路由关键字 String[] routingKeys = new String[]{"*.orange.*"}; //绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("TopicReceiverA Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("TopicReceiverA 接收消息 " + envelope.getRoutingKey() + ":" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
消息消费者B
package com.tingcream.www.test.rabbitmq.topic; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class TopicReceiverB { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个匹配模式的交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); //路由关键字 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; //绑定路由 for (String routingKey : routingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, routingKey); System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); } System.out.println("TopicReceiverB Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("TopicReceiverB 接收消息 " + envelope.getRoutingKey() + ":" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
依次启动消费者A、消费者B和生产者,控制台输出如下:
TopicSender:
TopicSender 发送消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicSender 发送消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!
TopicSender 发送消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicSender 发送消息 quick.brown.fox:来自 quick.brown.fox routingKey' s message!
TopicSender 发送消息 quick.orange.male.rabbit:来自 quick.orange.male.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!
TopicReceiverA:
绑定:topic_logs, queue:amq.gen-xFQbtZcDKv6uDcmn8hHzSQ, BindRoutingKey:*.orange.*
TopicReceiverA Waiting for messages
TopicReceiverA 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverA 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverA 接收消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!
TopicReceiverB:
绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:*.*.rabbit
绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:lazy.#
TopicReceiverB Waiting for messages
TopicReceiverB 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverB 接收消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!
登上rabbit管理控制台,查看topic_logs 交换机(topic类型)
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1