400-035-6699
当前位置: 首页 » 技术支持 » 博文资讯 »

RabbitMQ路由模型详解:深入理解Direct交换器机制

RabbitMQ 是一种广泛使用的消息队列中间件,它支持多种通信模型以满足不同场景的需求。在之前的文章中,我们探讨了发布订阅模型,这次我们将深入了解另一种常见的通信模型——路由模型(direct)。
路由模型实际上是分布订阅模型的一个升级版本,它引入了路由键(routing Key)的概念,用以精确控制消息的流向。在路由模型中,生产者将消息发送到交换机,交换机根据消息携带的路由键,将消息精确地转发到与其匹配的队列中。
### 路由模型的工作原理
生产者发送消息时,需要指定交换机和路由键。交换机收到消息后,会检查消息的路由键,并将其与队列绑定的路由键进行比对。如果匹配,交换机就会将消息推送到对应的队列。值得注意的是,一个队列可以绑定多个路由键,同时一个路由键也可以绑定到多个队列上。这意味着,当一个路由键被多个队列使用时,交换机会将消息复制到所有绑定了该路由键的队列中。
### 适用场景
路由模型非常适合点对点通信的场景,其中一些典型的应用案例包括:
- 系统监控告警通知:不同类型的告警信息可以通过不同的路由键分发到相应的处理系统。 - 任务分发:任务可以根据类型或优先级分配到不同的处理队列中。 - 用户私信系统:私信可以根据用户的ID或组别进行分类处理。 - 订单确认通知:订单状态更新可以根据订单类型或处理阶段发送到不同的通知队列。
### 演示
以下是一个简化的演示过程,展示了如何在RabbitMQ中使用路由模型。
#### 生产者
生产者负责创建消息并发送到交换机。以下是生产者的核心代码片段:
```java // 创建连接和通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送消息 for (int i = 0; i < 100; i++) { String routingKey = i % 2 == 0 ? "even" : "odd"; channel.bASICPublish(EXCHANGE_NAME, routingKey, null, ("路由模型发送的第 " + i + " 条信息").getBytes()); } ```
#### 消费者
消费者则根据绑定的路由键接收消息。以下是两个消费者的核心代码片段:
```java // 消费者1,只接收偶数路由键的消息 Connection connection1 = ConnectionUtils.getConnection(); Channel channel1 = connection1.createChannel(); channel1.queueDeclare(QUEUE_NAME1, false, false, false, null); channel1.exchangeDeclare(EXCHANGE_NAME, "direct"); channel1.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "even");
// 消费者2,只接收奇数路由键的消息 Connection connection2 = ConnectionUtils.getConnection(); Channel channel2 = connection2.createChannel(); channel2.queueDeclare(QUEUE_NAME2, false, false, false, null); channel2.exchangeDeclare(EXCHANGE_NAME, "direct"); channel2.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "odd"); ```
### 小结
通过使用路由模型,RabbitMQ 允许我们实现精确的消息传递,确保消息能够到达正确的队列。在使用时,需要注意路由键的设置必须与队列绑定的路由键一致,以确保消息能够被正确接收。此外,通过灵活配置交换机和路由键,我们能够构建更加复杂和高效的消息处理流程。

路由模型

RabbitMQ 提供了五种不同的通信模型,上一篇文章中,简单的介绍了一下RabbitMQ的发布订阅模型模型。这篇文章来学习一下RabbitMQ中的路由模型(direct)。

RabbitMQ路由模型详解:深入理解Direct交换器机制

路由模型(direct):路由模式相当于是分布订阅模式的升级版,多了一个 路由key来约束队列与交换机的绑定。

在路由模型中,生产者将消息发送到交换机,交换机根据消息的路由键将消息转发到对应的队列中。每个队列可以绑定多个路由键,每个路由键可以绑定到多个队列中。消费者从队列中接收消息并处理。当一个路由键被多个队列绑定时,交换机会将消息发送到所有绑定的队列中。当一个队列绑定多个路由键时,该队列将能够接收到所有路由键对应的消息。

适用场景

路由模型适用于需要点对点通信的场景,例如:

  1. 系统监控告警通知;
  2. 任务分发;
  3. 用户私信系统;
  4. 订单确认通知等。

演示

  1. 生产者

    // 生产者
    public class Producer {
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        // 定义路由的key,key值是可以随意定义的
        private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
        private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            for (int i = 0; i < 100; i++) {
                if (i % 2 == 0) {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
                } else {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
                }
            }
            channel.close();
            connection.close();
        }
    }
    
  2. 消费者

    // 消费者1
    public class Consumer {
        private static final String QUEUE_NAME = "queue_direct_1";
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handLEDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
    // 消费者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_direct_2";
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
  3. 测试
    先启动2个消费者,再启动生产者
    可以得到结果是消费者1得到了序号是偶数的消息
    消费者2得到了序号是奇数的消息

小结

本文介绍了 RabbitMQ 通信模型中的路由模型的使用,通过交换机和路由键实现点对点通信,适合于需要点对点通信的场景。在实际使用过程中,需要注意以下几点:

  1. 路由键必须要与消费者绑定队列时的路由键相同,否则无法接收到消息;
  2. 可以通过多个交换机和路由键来实现更灵活的消息路由。

【限时免费】一键获取网络规划系统模板+传输架构设计+连通性评估方案

点对点通信相关文章

服务电话:
400-035-6699
企服商城