RabbitMQ - topic

在publish/subscribe模式中使用fanout类型有个缺陷,就是不能选择性接收的消息。
我们可以让consumer获得所有已发布的消息中指定的几个消息。

在之前的例子中我们这样绑定exchange和队列:

channel.queueBind(queueName, EXCHANGE_NAME, "");

暂且不论该代码中绑定的exchange类型,这里空着的参数就是routing key。
routing key的意义与exchange类型有关,比如使用fanout类型就会忽略掉routing key。

而解决这一问题的就是direct类型。
direct exchange并不复杂,只不过是producer和consumer双方的exchange对应时还需要对应routing key。

以下代码中,同一个exchange和两个队列进行绑定,两个队列分别和不同的binding key绑定。
(PS:当然,我们也可以将同一个routing key绑定给不同的队列也没有问题。)
另外,SERVERITY变量是rounting数组,假设将日志通过exchange发送出去,consumer根据自己的需要获取不同级别的日志:

final class ChannelFactory_{
    private final static ConnectionFactory connFactory = new ConnectionFactory();

    public final static String EXCHANGE_NAME = "direct_exchange";
    public final static String[] SEVERITY = {"info","warning","error"};

    static {
        Channel temp = getChannel();
        try {
            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Channel getChannel(int channelNumber){
        try {
            Connection connection = connFactory.newConnection();
            return connection.createChannel(channelNumber);
        } catch (IOException e) {
            e.printStackTrace();
        }return null;
    }

    public static Channel getChannel(){
        try {
            Connection connection = connFactory.newConnection();
            return connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }return null;
    }

    public static void  closeChannel(Channel channel) throws IOException {
        channel.close();
        channel.getConnection().close();
    }

}

确认定义:

consumer只需要warning和error级别(routing)的日志消息:

public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = ChannelFactory_.getChannel();

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning");
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
        }

    }

producer将所有级别的日志都发送出去:

public static void main(String[] args) throws IOException {
        Channel channel = ChannelFactory_.getChannel();
        String content = "message "+new Date();

        for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) {
            channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes());
        }
        ChannelFactory_.closeChannel(channel);
    }

运行结果:

direct exchange可以让我们有选择性地接受消息。
但这样做仍然有缺陷。
虽然我可以只要求error和warning级别的日志,但是我不能再进行细分。
比如我只想要数据库相关的error和warning级别的日志。

为了实现这一点,我们需要使用另一个exchange类型——Topic。
exchange类型为topic时,routing key是一组用"."隔开的词,但仅限255bytes。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

topic和direct的不同点还有在consumer中定义routing key时我们可以使用通配符,比如:
符号‘*‘:可以匹配某一个词。
符号‘#‘:可以匹配0~N个词。

举个例子说明,假设我们用rounting key描述一个动物。
格式为: <性格>.<颜色>.<种类>
用符号‘*‘,我想要得到桔***的动物,即:"*.orange.*"
用符号‘#‘,我想要得到懒散的动物,即:"lazy.#"
如果使用过程中有人破坏了格式,即使rounting key为"lazy.orange.male.rabbit"也可以匹配"lazy.#"。

稍微修改上面的代码,首先定义一个topic exchange。

public  final static String EXCHANGE_NAME = "topic_exchange";
temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

确认定义:

发送sql相关的log:

public static void main(String[] args) throws IOException {
        Channel channel = ChannelFactory_.getChannel();
        String content = "message #$#$#$#$#$#$";

        channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes());

        ChannelFactory_.closeChannel(channel);
    }

consumer接收所有sql相关的warning和所有error:

public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = ChannelFactory_.getChannel();

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#");
        channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘");
        }

    }

运行结果:

时间: 2024-08-11 09:47:53

RabbitMQ - topic的相关文章

RabbitMQ Topic exchange

Topic exchange topic与之前的每个类型都不同(ps:废话每个都是不同的).Topic解决了我们另一个需求.举个例子,有一个做资讯的公司,他们会收集各种科技公司的动态并且第一时间转发出来.小编A负责微软公司,小编B负责谷歌公司,手工去搜索文章并且看标题是否匹配再进行转发是非常的低效的,可能小编们想要偷懒,写一个程序去各大网站进行爬数据,对于标题中含有微软的交给小编A,对于标题中含有谷歌的交给小编B. 是的,topic就是类似与正则进行模糊匹配routingkey,对于key需求是

Java使用RabbitMQ之订阅分发(Topic)

使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配. 生产者代码: 1 package org.study.exchange3.topic3; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study

RabbitMQ指南之五:主题交换器(Topic Exchange)

在上一章中,我们完善了我们的日志系统,用direct交换器替换了fanout交换器,使得我们可以有选择性地接收消息.尽管如此,仍然还有限制:不能基于多个标准进行路由.在我们的日志系统中,我们可能不仅希望根据日志等级订阅日志,还希望根据日志来源订阅日志.这个概念来自于unix工具syslog,它不仅可以根据日志等级(info/warn/crit...)来路由日志,同时还可以根据设备(auth/cron/kern...)来路由日志.这将更加灵活,我们可能希望只监听来自'cron'的error级别日志

SpringBoot ( 八 ) :RabbitMQ 详解

原文出处: 纯洁的微笑 RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方

springboot学习笔记-6 springboot整合RabbitMQ

一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

python学习笔记-Day12 (上下文管理、redis发布订阅、rabbitmq、pymysql模块、SQLAchemy)

上下文管理 import contextlib # 上下文管理 @contextlib.contextmanager def worker_state(state_list, worker_thread): """ :param state_list: :param worker_thread: :return: """ state_list.append(worker_thread) # 2. 进入执行函数体 try: yield # 3. 遇

rabbitmq文章源

网易杭研后台技术中心的博客 rabbitmq topic简单demo 版权声明:本文为博主原创文章,未经博主允许不得转载.

springboot系列-springboot整合RabbitMQ

一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

springboot(八):RabbitMQ详解

http://www.cnblogs.com/ityouknow/p/6120544.html ******************************************************* RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的