博客详情

springboot中集成rabbitmq(direct、fanout、topic交换机模式) (原创)

作者: 朝如青丝暮成雪
发布时间:2025-10-22 16:38:39  文章分类:rabbitMQ   阅读(52)  评论(0)

rabbitmq中交换机有direct 、fanout、topic、headers四种类型。本文我们重点介绍下前三种的使用案例。


项目准备

1、项目中pom.xml引入依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、bootstrap.yml配置 

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /yourvhost
    username:   youruser
    password:   yourpassword


=========================================

一、direct交换机

direct交换机 发送消息时必须指定routingKey(消息路由),且需要声明队列绑定到routingKey ,消费者程序监听消息队列。

DirectExchangeConfig 配置类


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * direct 交换机
 */
@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_EXCHANGE = "directExchange";

    public static final String DIRECT_QUEUE1 = "directQueue1";
    public static final String DIRECT_QUEUE2 = "directQueue2";

    public static final String DIRECT_ROUTING_KEY = "direct";


    // Direct交换机:持久化
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    // 队列1:持久化
    @Bean
    public Queue directQueue1() {
        return new Queue(DIRECT_QUEUE1, true);
    }
    // 队列2:持久化
    @Bean
    public Queue directQueue2() {
        return new Queue(DIRECT_QUEUE2, true);
    }


    // 绑定队列到交换机,并设置路由键
    @Bean
    public Binding bindingDirectExchange1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY);
    }

    // 绑定队列到交换机,并设置路由键
    @Bean
    public Binding bindingDirectExchange2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with(DIRECT_ROUTING_KEY);
    }

}



DirectExController.java  类


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * directmq消息发送
 */
@Slf4j
@RestController
@RequestMapping("/directEx")
public class DirectExController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * direct交换机为直连模式交换机
     *      根据消息携带的路由键将消息投递给对应队列
     *
     * @return
     */
    @TokenIgnore
    @RequestMapping(value = "/send",method = RequestMethod.GET)
    public Resp send() {
         rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");
         return Resp.success();
    }
}


DirectQueueListener.java 监听器类 


import com.riskmage.rmccloud.ins.rabbitmq.config.DirectExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *消息监听器
 */
@Slf4j
@Component
public class DirectQueueListener {

    @RabbitHandler
    @RabbitListener(queues = "directQueue1")
    public void process1(String message) {
        log.info("directQueue1接收到消息: " + message);
    }

    @RabbitHandler
    @RabbitListener(queues = "directQueue2")
    public void process2(String message) {
        log.info("directQueue2接收到消息: " + message);
    }
}


监听的方法中除了可以接收普通字符串作为消息体,也支持Map、普通Java POJO类(实现Serializable接口)作为消息体。

@Payload  指定消息体对象,  @Header 可以接收指定消息的Http头   ,也可以接收MessageProperties 拿到所有消息附加属性(含所有消息http请求头)等。 channel表示连接通道,可以回复ACK


      @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE1)
    public void onMessage(@Payload Map<String, Object> message, Channel channel,
                          @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                          MessageProperties messageProperties ) {
        try {
            String str = JSON.toJSONString(message);
            log.info("directQueue1接收到消息对象:"+str);
            log.info("deliveryTag:{}",deliveryTag);

            Object value1 = messageProperties.getHeader("key1");
            log.info("获取到自定义请求头key1的值: "+value1);

            //消息成功处理完成  ,回复ack
            log.info("消息成功处理完成 ,回复ack");
            channel.basicAck(deliveryTag,false);

        }catch (Exception e){
            log.error(e.getMessage(),e);
        }
    }

http://localhost:8074/directEx/send   发送请求试下,能推送消息到mq,消费者程序能监听到消息内容并输出控制台。



二、fanout交换机

fanout交换机为扇形模式交换机 ,消息会广播到所有绑定的队列上,此时不需要指定routingKey。

FanoutExchangeConfig配置类


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * fanout交换机为扇形模式交换机 ,消息会发送到所有绑定的队列上。此时不需要指定routingKey
 */
@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_EXCHANGE = "fanoutExchange";

    public static final String FANOUT_QUEUE1 = "fanoutQueue1";
    public static final String FANOUT_QUEUE2 = "fanoutQueue2";

 


    //fanout交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }

    //fanout队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE1, true);
    }

    //fanout队列2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2, true);
    }

    @Bean
    public Binding bindingFanoutExchange1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutExchange2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

}
FanoutExController类
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.riskmage.rmcbase.Resp;
import com.riskmage.rmcbase.mq.QMessage;
import com.riskmage.rmccloud.ins.common.annotation.TokenIgnore;
import com.riskmage.rmccloud.ins.rabbitmq.config.FanoutExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/fanoutEx")
public class FanoutExController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * fanout交换机为扇形模式交换机
     *      消息会发送到所有绑定的队列上。
     * @return
     */
    @TokenIgnore
    @RequestMapping(value = "/send",method = RequestMethod.GET)
    public Resp send() {
        QMessage message  =new QMessage();
        message.setMessageId(IdUtil.fastSimpleUUID());
        message.setSendTime(DateUtil.now());
        message.setData(new DemoPO("张三",23));
        rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, message);
        return Resp.successMsg("fanout消息发送成功!!") ;
    }

}


QMessage需要实现序列化接口


public class QMessage implements Serializable {
    private String messageId;
    private String sendTime;
    private Object data;

    public QMessage() {
    }

    public QMessage(String messageId, String sendTime, Object data) {
        this.messageId = messageId;
        this.sendTime = sendTime;
        this.data = data;
    }

    public String getMessageId() {
        return this.messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getSendTime() {
        return this.sendTime;
    }

    public void setSendTime(String sendTime) {
        this.sendTime = sendTime;
    }

    public Object getData() {
        return this.data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}


FanoutQueueListener监听类


import com.alibaba.fastjson.JSON;
import com.riskmage.rmcbase.mq.QMessage;
import com.riskmage.rmccloud.ins.rabbitmq.config.FanoutExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class FanoutQueueListener {


    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE1)
    public void process1(@Payload QMessage message) {
        String str = JSON.toJSONString(message);
        log.info("收到来自fanoutQueue1的消息: " + str);
    }


    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)
    public void process2(@Payload  QMessage message) {
        String str = JSON.toJSONString(message);
        log.info("收到来自fanoutQueue2的消息: " + str);
    }
}


http://localhost:8074/fanoutEx/send    发送请求试下。 


三、topic交换机

topic交换机于fanout交换机类似,同样都是广播消息给多个队列(一对多)。但是推送消息需要指定routingKey, 队列绑定路由关键字支持* 和# 模式匹配。

(星号 *) 用来表示一个单词 (必须出现的)
(井号 #) 用来表示任意数量(零个或多个)单词


TopicExchangeConfig类


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * topic 交换机
 *
 *  (星号 *) 用来表示一个单词 (必须出现的)
 *  (井号 #) 用来表示任意数量(零个或多个)单词
 */
@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_QUEUE1 = "topicQueue1";
    public static final String TOPIC_QUEUE2 = "topicQueue2";
    public static final String TOPIC_QUEUE3 = "topicQueue3";

    public static final String TOPIC_EXCHANGE = "topicExchange";
   // public static final String TOPIC_ROUTING_KEY = "logs.*";


    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }


    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1, true);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2, true);
    }

    @Bean
    public Queue topicQueue3() {
        return new Queue(TOPIC_QUEUE3, true);
    }


    @Bean
    public Binding bindingTopicExchange1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("dev.#");
    }

    @Bean
    public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");
    }

    @Bean
    public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#");
    }
 
}


TopicExController类


import com.riskmage.rmcbase.Resp;
import com.riskmage.rmccloud.ins.common.annotation.TokenIgnore;
import com.riskmage.rmccloud.ins.rabbitmq.config.TopicExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/topicEx")
public class TopicExController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @TokenIgnore
    @RequestMapping(value = "/send",method = RequestMethod.GET)
    public Resp send(String routingKey) {
        rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "这是一条topic测试消息,消息routingkey为"+routingKey);
        return Resp.successMsg("topic消息发送成功!!") ;
    }

}
TopicQueueListener监听类



import com.riskmage.rmccloud.ins.rabbitmq.config.TopicExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TopicQueueListener {


    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE1)
    public void process1(String message) {
        log.info("topicQueue1收到消息  : " + message);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)
    public void process2(String message) {
        log.info("topicQueue2收到消息  : " + message);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)
    public void process3(String message) {
        log.info("topicQueue3收到消息  : " + message);
    }

}


发送请求推送消息 试下。

http://localhost:8074/topicEx/send?routingKey=dev.goods
http://localhost:8074/topicEx/send?routingKey=test.goods

关键字:  rabbitmq  topic  fanout  direct
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号