Rabbitmq(6) 主题模式

* 匹配1个

# 匹配所有

发送者:

package com.aynu.bootamqp.service;

import com.aynu.bootamqp.commons.utils.Amqp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private final static String Exchange_NAME ="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = Amqp.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(Exchange_NAME,"topic");
            //在手动确认机制之前
            //一次只发送一条消息,给不同的消费者
            channel.basicQos(1);

            String message = "hello ps";
            String routingKey ="goods.delete";
            channel.basicPublish(Exchange_NAME,routingKey,null,message.getBytes("utf-8"));
            System.out.println(message);
            channel.close();
            connection.close();
    }
}

接受者1

package com.aynu.bootamqp.service;

import com.aynu.bootamqp.commons.utils.Amqp;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@SuppressWarnings("all")
public class Receive {

    private final static String QUEUE_NAME ="hello";
    private final static String Exchange_NAME ="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Amqp.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列

        channel.queueBind(QUEUE_NAME,Exchange_NAME,"goods.add");
       // 一次只处理一个消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("receive"+msg);
                try {
                    Thread.sleep(1000*2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动发送消息确认机制
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        // 自动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

接受者2

package com.aynu.bootamqp.service;

import com.aynu.bootamqp.commons.utils.Amqp;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Receive2 {

    private final static String QUEUE_NAME ="hello1";
    private final static String Exchange_NAME ="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Amqp.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,Exchange_NAME,"goods.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body,"utf-8");
                System.out.println("receive2222"+msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 手动发送消息确认机制
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

原文地址:https://www.cnblogs.com/mm163/p/10703709.html

时间: 2024-11-08 06:37:54

Rabbitmq(6) 主题模式的相关文章

RabbitMQ六种队列模式-简单队列模式

前言 RabbitMQ六种队列模式-简单队列 [本文]RabbitMQ六种队列模式-工作队列RabbitMQ六种队列模式-发布订阅RabbitMQ六种队列模式-路由模式RabbitMQ六种队列模式-主题模式 在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者.同样也称为点对点模式 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个

activeMQ队列模式和主题模式的Java实现

一.队列模式 生产者 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activ

使用Java编写ActiveMQ的队列模式和主题模式

队列模式的消息演示 本小节简单演示一下如何使用JMS接口规范连接ActiveMQ,首先创建一个Maven工程,在pom.xml文件中,添加activemq的依赖: <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version

ActiveMQ队列、主题模式区别

1.ActiveMQ队列模式如下图,生产者创建消息到消息中间件,再"均分给消费者". 2.ActiveMQ主题模式如下图,生产者创建消息到消息中间件,消费者会接受到订阅的主题中所有的消息.在主题模式下,消费者获取不到订阅之前的中间件中的消息. 原文地址:https://www.cnblogs.com/GrapefruitTea/p/9941169.html

RabbitMQ之消息模式(下)

目的: RabbitMQ之消息模式(上):https://www.cnblogs.com/huangting/p/11994539.html 消费端限流 消息的ACK与重回队列 TTL消息 死信队列 消费端限流 什么是消费端的限流? 假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据 消费端限流RabbitMQ提供的解决方案 RabbitMQ提供了一种qos(服务

ActiveMQ--模式(队列模式/主题模式)

两种模式:队列模式/主题模式 pom.xml <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> 队列模式,其实就是分食模式. 比如生产方发了 10条消息到 activeMQ 服务器, 而此时有多个 消费方

主题模式

Topic(主题模式) Topic exchange direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求.topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但匹配规则有些不同 routing_key-它必须是单词列表,以点分隔.这些词可以是任何东西,但是通常它们指定

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

Github开源:Sheng.RabbitMQ.CommandExecuter (RabbitMQ 的命令模式实现)

[Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter Sheng.RabbitMQ.CommandExecuter 是使用 .Net 对 RabbitMQ 的一个简单封装. 它通过XML配置文件定义Exchange及队列等信息,根据此配置文件自动声明及初始化相关队列信息,方便 .Net 开发人员使用 RabbitMQ. 并实现了一个基于 MQ 的命令执行器,将 MQ 消息抽象化为命令,发布端和订阅端通过命令进行交互