400-035-6699
当前位置: 首页 » 技术支持 » 博文资讯 »

RabbitMQ路由模型详解:Direct交换器的工作原理与配置

RabbitMQ作为一款消息队列中间件,支持多种消息传递模式,其中之一便是路由模式(Direct)。这种模式在发布/订阅模式的基础上,通过引入路由键(routing Key)对消息的传输路径进行约束,从而实现更加精细化的消息分发。在路由模式中,生产者将带有特定路由键的消息发送到交换机,交换机则根据这些路由键将消息投递到对应队列。通过灵活设置路由键和队列绑定,可以构建起多样化的消息传递路径,满足不同场景下的需求。
以一个简单的场景为例,假设我们需要构建一个用户私信系统,系统中包含多个用户,每个用户发送的私信都需要传递给特定的接收者。此时,我们可以使用路由模式来实现。首先,为每个用户创建一个队列,代表其接收私信的“信箱”。然后,定义一个交换机,用于接收生产者发送的消息。生产者在发送消息时,需要指定一个路由键,该键值可以是接收者的ID,也可以是其他能唯一标识接收者的信息。交换机根据这个路由键,将消息转发到对应用户的队列中,从而确保消息能够被正确投递。
在路由模式中,队列可以绑定多个路由键,而每个路由键也可以绑定到多个队列。这意味着一个消息可以被投递到多个队列中,从而实现“广播”效果。例如,在任务分发系统中,可以将任务按照类型(如计算任务、IO任务等)设置不同的路由键,并为每种类型的任务创建一个队列。这样,当生产者发送一个任务消息时,只需要指定其类型作为路由键,交换机便会将该任务消息发送到所有绑定该类型路由键的队列中,从而确保所有相关任务都能被正确处理。
当然,在使用路由模式时,我们也需要注意一些问题。首先,确保生产者发送消息时使用的路由键与消费者绑定队列时使用的路由键保持一致,否则消息将无法被正确投递。其次,可以通过多个交换机和路由键的组合,实现更加灵活的消息路由。例如,可以为不同的任务类型创建不同的交换机,从而实现不同类型任务之间的隔离。
总之,RabbitMQ的路由模式提供了一种简单而强大的消息传递机制,通过灵活设置路由键和队列绑定,可以实现点对点通信和广播效果,满足多样化场景的需求。在实际应用中,我们需要根据具体需求选择合适的路由策略,确保消息的快速、准确传递。

路由模型

RabbitMQ 提供了五种不同的通信模型,上一篇文章中,简单的介绍了一下RabbitMQ的发布订阅模型模型。这篇文章来学习一下RabbitMQ中的路由模型(direct)。

RabbitMQ路由模型详解:Direct交换器的工作原理与配置

路由模型(direct):路由模式相当于是分布订阅模式的升级版,多了一个 路由key来约束队列与交换机的绑定。

在路由模型中,生产者将消息发送到交换机,交换机根据消息的路由键将消息转发到对应的队列中。每个队列可以绑定多个路由键,每个路由键可以绑定到多个队列中。消费者从队列中接收消息并处理。当一个路由键被多个队列绑定时,交换机会将消息发送到所有绑定的队列中。当一个队列绑定多个路由键时,该队列将能够接收到所有路由键对应的消息。

适用场景

路由模型适用于需要点对点通信的场景,例如:

  1. 系统监控告警通知;
  2. 任务分发;
  3. 用户私信系统;
  4. 订单确认通知等。

演示

  1. 生产者

    // 生产者
    public class Producer {
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        // 定义路由的key,key值是可以随意定义的
        private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
        private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            for (int i = 0; i < 100; i++) {
                if (i % 2 == 0) {
                    channel.bASICPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
                } else {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型发送的第 " + i + " 条信息").getBytes());
                }
            }
            channel.close();
            connection.close();
        }
    }
    
  2. 消费者

    // 消费者1
    public class Consumer {
        private static final String QUEUE_NAME = "queue_direct_1";
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handLEDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
    // 消费者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_direct_2";
        private static final String EXCHANGE_NAME = "exchange_direct_1";
        private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
  3. 测试
    先启动2个消费者,再启动生产者
    可以得到结果是消费者1得到了序号是偶数的消息
    消费者2得到了序号是奇数的消息

小结

本文介绍了 RabbitMQ 通信模型中的路由模型的使用,通过交换机和路由键实现点对点通信,适合于需要点对点通信的场景。在实际使用过程中,需要注意以下几点:

  1. 路由键必须要与消费者绑定队列时的路由键相同,否则无法接收到消息;
  2. 可以通过多个交换机和路由键来实现更灵活的消息路由。

【限时免费】一键获取网络规划系统模板+传输架构设计+连通性评估方案

相关文章

服务电话:
400-035-6699
企服商城