博客详情

rabbitmq高级消息队列-topic主题队列 (原创)

作者: 朝如青丝暮成雪
发布时间:2017-10-08 08:25:42  文章分类:rabbitMQ   阅读(1278)  评论(0)

 实际上在AQMP(高级消息队列协议)中,共有direct、fanout、topic、headers 四种不同类型的交换机。

topic交换机 称为主题交换机,能够让消费者以通配符的方式订阅相关主题。topic交换机上的路由key通常是由点号隔开的一系列的标识符。

* 可以匹配一个标识符。
# 可以匹配0个或多个标识符。

如  "audit.#"能够匹配到"audit.irs.corporate",但是“audit.*"只会匹配到"audit.irs"。

示例代码如下。
消息生产者:


package com.tingcream.www.test.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * topic  生产者
 * 这种应该属于模糊匹配

   * (星号)可以替代一个词
   # (井号)可以替代0或者更多的词
   
 */
public class TopicSender {
	 private static final String EXCHANGE_NAME = "topic_logs";

	    public static void main(String[] args) throws  Exception {
	        Connection connection = null;
	        Channel channel = null;
	        try{
	            ConnectionFactory factory=new ConnectionFactory();
	            
	            factory.setHost("192.168.9.130");    
	            //指定用户 密码  
	            factory.setUsername("zhangsan");  
	            factory.setPassword("123456"); 
	            
	            connection=factory.newConnection();
	            channel=connection.createChannel();

	            //声明一个匹配模式的交换机
	            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
	            //待发送的消息
	            String[] routingKeys=new String[]{
	                    "quick.orange.rabbit",
	                    "lazy.orange.elephant",
	                    "quick.orange.fox",
	                    "lazy.brown.fox",
	                    "quick.brown.fox",
	                    "quick.orange.male.rabbit",
	                    "lazy.orange.male.rabbit"
	            };
	            //发送消息
	            for(String routingKey :routingKeys){
	                String message = "来自 "+routingKey+" routingKey' s message!";
	                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
	                System.out.println("TopicSender 发送消息 " + routingKey + ":" + message );
	            }
	        }catch (Exception e){
	            e.printStackTrace();
	            if (connection!=null){
	                channel.close();
	                connection.close();
	            }
	        }finally {
	            if (connection!=null){
	                channel.close();
	                connection.close();
	            }
	        }
	    }
}


消息消费者A


package com.tingcream.www.test.rabbitmq.topic;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicReceiverA {

	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
     //   factory.setPort(AMQP.PROTOCOL.PORT);  
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        //路由关键字
        String[] routingKeys = new String[]{"*.orange.*"};
        //绑定路由
        for (String routingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
        }
        System.out.println("TopicReceiverA Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("TopicReceiverA 接收消息 " + envelope.getRoutingKey() + ":" + message );
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}



消息消费者B


package com.tingcream.www.test.rabbitmq.topic;

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicReceiverB {

	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        
        factory.setHost("192.168.9.130");    
        //指定用户 密码  
        factory.setUsername("lisi");  
        factory.setPassword("123456"); 
        
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个匹配模式的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        //路由关键字
        String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
        //绑定路由
        for (String routingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            System.out.println("绑定:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
        }
        System.out.println("TopicReceiverB Waiting for messages");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("TopicReceiverB 接收消息 " + envelope.getRoutingKey() + ":" + message );
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}


依次启动消费者A、消费者B和生产者,控制台输出如下:


TopicSender:

TopicSender 发送消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicSender 发送消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!
TopicSender 发送消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicSender 发送消息 quick.brown.fox:来自 quick.brown.fox routingKey' s message!
TopicSender 发送消息 quick.orange.male.rabbit:来自 quick.orange.male.rabbit routingKey' s message!
TopicSender 发送消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!



TopicReceiverA:

绑定:topic_logs, queue:amq.gen-xFQbtZcDKv6uDcmn8hHzSQ, BindRoutingKey:*.orange.*
TopicReceiverA Waiting for messages
TopicReceiverA 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverA 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverA 接收消息 quick.orange.fox:来自 quick.orange.fox routingKey' s message!


TopicReceiverB:

绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:*.*.rabbit
绑定:topic_logs, queue:amq.gen-IqPkcqc-vV-pGyTcajPYxw, BindRoutingKey:lazy.#
TopicReceiverB Waiting for messages
TopicReceiverB 接收消息 quick.orange.rabbit:来自 quick.orange.rabbit routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.elephant:来自 lazy.orange.elephant routingKey' s message!
TopicReceiverB 接收消息 lazy.brown.fox:来自 lazy.brown.fox routingKey' s message!
TopicReceiverB 接收消息 lazy.orange.male.rabbit:来自 lazy.orange.male.rabbit routingKey' s message!


登上rabbit管理控制台,查看topic_logs 交换机(topic类型)









关键字:  rabbitmq
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号