RabbitMQ学习第四记:路由模式(direct)

1、什么是路由模式(direct)

  路由模式是在使用交换机的同时,生产者指定路由发送数据,消费者绑定路由接受数据。与发布/订阅模式不同的是,发布/订阅模式只要是绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机下的那个路由,并且只有当消费者的队列绑定了交换机并且声明了路由,才会收到数据。下图取自于官方网站(RabbitMQ)的路由模式的图例

P:消息的生产者

X:交换机

红色:队列

C1,C2:消息消费者

error,info,warning:路由

  举个日志处理例子:系统需要针对日志做分析,首先所有的日志级别的日志都需要保存,其次error日志级别的日志需要单独做处理。这时就可以使用路由模式来处理了,声明交换机使用路由模式,每个日志级别的日志对应一个路由(error,info,warning)。声明一个保存日志队列用于接受所有日志,绑定交换机并绑定所有路由。声明第二个队列用于处理error级别日志,绑定交换机且只绑定error路由。以下是代码讲解。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的)

2、生产者(Send)代码

复制代码
public class Send
{
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名称warning
private final static String ROUTING_KEY_WARNING = "warning";

//路由名称info
private final static String ROUTING_KEY_INFO    = "info";

//路由名称error
private final static String ROUTING_KEY_ERROR   = "error";

public static void main(String[] args)
{
    try
    {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message = "this is warning log";
        //发送消息(warning级别日志)
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        //发送消息(info级别日志)
        message = "this is info log";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        //发送消息(error级别日志)
        message = "this is error log";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        channel.close();
        connection.close();
    }
    catch (IOException | TimeoutException e)
    {
        e.printStackTrace();
    }
}

}

运行结果:
[send]:this is warning log
[send]:this is info log
[send]:this is error log

复制代码
3、消费者1(ReceiveAllLog)

复制代码
public class ReceiveAllLog
{
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名称warning
private final static String ROUTING_KEY_WARNING = "warning";

//路由名称info
private final static String ROUTING_KEY_INFO    = "info";

//路由名称error
private final static String ROUTING_KEY_ERROR   = "error";

//队列名称
private static final String QUEUE_NAME          = "test_queue_save_all_log";

public static void main(String[] args)
{
    try
    {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中获取一个通道
        final Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //将队列绑定到交换机(指定路由info,error,warning)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);
        //保证一次只分发一个
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException
            {
                String message = new String(body, "utf-8");
                System.out.println("[test_queue_save_all_log] Receive message:" + message);
                try
                {
                    //消费者休息2s处理业务
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //设置手动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
}

}

运行结果:
[test_queue_save_all_log] Receive message:this is warning log
[test_queue_save_all_log] Receive message:this is info log
[test_queue_save_all_log] Receive message:this is error log

复制代码
4、消费者2(ReceiveErrorLog)

复制代码
public class ReceiveErrorLog
{
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名称error
private final static String ROUTING_KEY_ERROR = "error";

//队列名称
private static final String QUEUE_NAME        = "test_queue_handel_error";

public static void main(String[] args)
{
    try
    {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中获取一个通道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //将队列绑定到交换机(指定路由error)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
        //保证一次只分发一个
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException
            {
                String message = new String(body, "utf-8");
                System.out.println("[test_queue_handel_error] Receive message:" + message);
                try
                {
                    //消费者休息2s处理业务
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //设置手动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
}

}

运行结果:
[test_queue_handel_error] Receive message:this is error log
复制代码
总结:

  1.两个队列消费者设置的路由不一样,接收到的消息就不一样。路由模式下,决定消息向队列推送的主要取决于路由,而不是交换机了。

  2.该模式必须设置交换机,且声明路由模式:channel.exchangeDeclare(EXCHANGE_NAME, "direct");

原文地址:http://blog.51cto.com/13954634/2173373

时间: 2024-07-31 05:02:49

RabbitMQ学习第四记:路由模式(direct)的相关文章

RabbitMQ (六) : 订阅者模式之路由模式 ( direct )

路由模式下,生产者发送消息时需要指定一个路由键(routingKey),交换机只会把消息转发给包含该路由键的队列 这里,我们改变一下声明交换机的方式. 我们通过管理后台添加一个交换机. 添加后,生产者和消费者的代码中就不需要再声明交换机了.同样,也可以通过管理后台添加队列,那么代码中也不需要声明队列了. 生产者 public class Producer { private const string ExchangeName = "test_exchange_direct"; publ

python使用rabbitMQ介绍四(路由模式)

一.模式介绍 路由模式,与发布-订阅模式一样,消息发送到exchange中,消费者把队列绑定到exchange上. 这种模式在exchange上添加添加了一个路由键(routing-key),生产者发布消息的时候添加路由键(routing-key),消费者绑定队列到交换机时添加键值(routing-key),这样就可以接收到对应的消息. 路由模式的direct exchange. 队列模型: 与发布-订阅模式不同的是,每个消费者队列接收的消息不同,根据消息的routing-key把消息发送到不同

RabbitMQ学习之:(六)Direct Exchange (转贴+我的评论)

From: http://lostechies.com/derekgreer/2012/04/02/rabbitmq-for-windows-direct-exchanges/ RabbitMQ for Windows: Direct Exchanges Posted by Derek Greer on April 2, 2012 This is the fifth installment to the series: RabbitMQ for Windows.  In thelast inst

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

rabbitMQ学习笔记(四) 发布/订阅消息

前面都是一条消息只会被一个消费者处理. 如果要每个消费者都处理同一个消息,rabbitMq也提供了相应的方法. 在以前的程序中,不管是生产者端还是消费者端都必须知道一个指定的QueueName才能发送.获取消息.  而rabbitMQ消息模型的核心思想是生产者不会将消息直接发送给队列. 因为,生产者通常不会知道消息将会被哪些消费者接收. 生产者的消息虽然不是直接发送给Queue,但是消息会交给Exchange,所以需要定义Exchange的消息分发模式 ,之前的程序中,有如下一行代码: chan

RabbitMQ学习(四).NET Client之Routing

4 Routing Receiving messages selectively Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Routing(路由) (using the .NET client) 上一节教程我们已经构建好了一个简单的日志系统,能够广播日志消息到多个receiver. 这一节我们将给这个日志系统加一个功能,即我们只订阅这些消息中的一个子集,而不是全部. 例如,前面我们是将订阅到的所有日志消息要么存储到

rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉. 这时候就可以对消息进行过滤了. 在消费者端设置好需要接收的消息类型. 如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型. channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 示例

rabbitmq学习(四):利用rabbitmq实现远程rpc调用

一.rabbitmq实现rpc调用的原理 ·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识.服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中.原理图如下: 二.代码实现 下面我们将模拟实现一个rpc客户端和rpc服务端.客户端给服务端发送message,服务端收到后处理message,

RabbitMQ学习系列(四): 几种Exchange 模式

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ.今天说些理论的东西,Exchange 的几种模式. AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相