在前面介绍的消息队列中,都没有涉及路由选择的问题,因为消费者监听的队列名称与生产者发布的队列名称是相同的,路由key等于队列名称,因而也是相同的。
如果消费者不关心队列名称(这时队列名称是匿名或随机的),而只关心路由key,就需要用到路由key的绑定。关键代码就是下面这行:
channel.queueBind(queue,EXCHANGE_NAME,routingKey);//channel 中 绑定 queue和路由key
示例代码如下:
日志生产者
package com.tingcream.www.test.rabbitmq.directRouting; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 日志生产者 * routing logs * @author jelly */ public class RoutingLogSender { private static final String EXCHANGE_NAME = "direct_routing_logs"; // 路由关键字 private static final String[] routingKeys = new String[]{"info" ,"warning", "error"}; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("zhangsan"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct");//注意是direct //发送信息 for (String routingKey:routingKeys){ String message = "这是一条消息 " ; channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes()); System.out.println("发送: "+routingKey +":" + message); } channel.close(); connection.close(); } }
日志消费者A
package com.tingcream.www.test.rabbitmq.directRouting; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * direct 消息。 * 消费者A * 客户端根据路由key 筛选 需要接收处理的消息 */ public class RoutingLogReceiverA { // 交换器名称 private static final String EXCHANGE_NAME = "direct_routing_logs"; // 路由关键字 private static final String[] routingKeys = new String[]{"info" ,"warning"}; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //获取匿名队列名称 String queueName=channel.queueDeclare().getQueue(); //根据路由关键字进行绑定 for (String routingKey:routingKeys){ channel.queueBind(queueName,EXCHANGE_NAME,routingKey);//通道 绑定 queue System.out.println("绑定 :"+EXCHANGE_NAME+"," + " queue:"+queueName+", BindRoutingKey:" + routingKey); } System.out.println("RoutingLogReceiverA Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到 :" + envelope.getRoutingKey() + ":" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
日志消费者B
package com.tingcream.www.test.rabbitmq.directRouting; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * direct 消息。 * 消费者B * 客户端根据路由key 筛选 需要接收处理的消息 */ public class RoutingLogReceiverB { // 交换器名称 private static final String EXCHANGE_NAME = "direct_routing_logs"; // 路由关键字 private static final String[] routingKeys = new String[]{"error"}; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.9.130"); //指定用户 密码 factory.setUsername("lisi"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意这里的交换机类型为direct 类型 //获取匿名队列名称 String queue=channel.queueDeclare().getQueue(); //根据路由关键字进行绑定 for (String routingKey:routingKeys){ channel.queueBind(queue,EXCHANGE_NAME,routingKey);//通道 绑定 queue System.out.println("绑定:"+EXCHANGE_NAME+"," + " queue:"+queue+", BindRoutingKey:" + routingKey); } System.out.println("RoutingLogReceiverB Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到 :" + envelope.getRoutingKey() + ":" + message ); } }; channel.basicConsume(queue, true, consumer); } }
分别启动消费者A、消费者B 和生产者 。
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1