博客详情

rabbitmq高级消息队列-路由绑定 (原创)

作者: 朝如青丝暮成雪
发布时间:2017-09-24 09:22:24  文章分类:rabbitMQ   阅读(922)  评论(0)

在前面介绍的消息队列中,都没有涉及路由选择的问题,因为消费者监听的队列名称与生产者发布的队列名称是相同的,路由key等于队列名称,因而也是相同的。

如果消费者不关心队列名称(这时队列名称是匿名或随机的),而只关心路由key,就需要用到路由key的绑定。关键代码就是下面这行: 

channel.queueBind(queue,EXCHANGE_NAME,routingKey);//channel 中 绑定 queue和路由key

示例代码如下:

日志生产者


package com.tingcream.www.test.rabbitmq.directRouting;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 日志生产者    
 * routing logs
 * @author jelly
 */
public class RoutingLogSender {
	   private static final String EXCHANGE_NAME = "direct_routing_logs";
	    // 路由关键字
	    private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
	    public static void main(String[] args) throws  Exception {
	        ConnectionFactory factory = new ConnectionFactory();

	        factory.setHost("192.168.9.130");    
	        //指定用户 密码  
	        factory.setUsername("zhangsan");  
	        factory.setPassword("123456"); 
	        
	        Connection connection = factory.newConnection();
	        Channel channel = connection.createChannel();
	        //声明交换机
	        channel.exchangeDeclare(EXCHANGE_NAME,"direct");//注意是direct
	        //发送信息
	        for (String routingKey:routingKeys){
	            String message = "这是一条消息 " ;
	            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
	            System.out.println("发送: "+routingKey +":" + message);
	        }
	        channel.close();
	        connection.close();
	    }
}


日志消费者A


package com.tingcream.www.test.rabbitmq.directRouting;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * direct 消息。
 * 消费者A  
 * 客户端根据路由key 筛选   需要接收处理的消息
 */
public class RoutingLogReceiverA {
	 // 交换器名称
	  private static final String EXCHANGE_NAME = "direct_routing_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"info" ,"warning"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
        
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //获取匿名队列名称
        String queueName=channel.queueDeclare().getQueue();

        //根据路由关键字进行绑定
        for (String routingKey:routingKeys){
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);//通道 绑定 queue
            System.out.println("绑定 :"+EXCHANGE_NAME+"," +
                    " queue:"+queueName+", BindRoutingKey:" + routingKey);
        }
        
        
        System.out.println("RoutingLogReceiverA  Waiting for messages");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到 :" + envelope.getRoutingKey() + ":" + message );
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}



日志消费者B


package com.tingcream.www.test.rabbitmq.directRouting;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * direct 消息。
 * 消费者B
 * 客户端根据路由key 筛选   需要接收处理的消息
 */
public class RoutingLogReceiverB {
	 // 交换器名称
	  private static final String EXCHANGE_NAME = "direct_routing_logs";
    // 路由关键字
    private static final String[] routingKeys = new String[]{"error"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
        
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");//注意这里的交换机类型为direct 类型
        //获取匿名队列名称
        String queue=channel.queueDeclare().getQueue();

        //根据路由关键字进行绑定
        for (String routingKey:routingKeys){
            channel.queueBind(queue,EXCHANGE_NAME,routingKey);//通道 绑定 queue
            System.out.println("绑定:"+EXCHANGE_NAME+"," +
                    " queue:"+queue+", BindRoutingKey:" + routingKey);
        }
        
        
        System.out.println("RoutingLogReceiverB  Waiting for messages");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到  :" + envelope.getRoutingKey() + ":" + message );
            }
        };
        channel.basicConsume(queue, true, consumer);
    }
}


分别启动消费者A、消费者B 和生产者 。




关键字:  rabbitmq
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号