Java操作RabbitMQ添加队列、消费队列和三个交换机

一、发送消息到队列(生产者)

新建一个maven项目,在pom.xml文件加入以下依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
</dependencies><br>

新建一个P1类

package com.rabbitMQ.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-11:23
 */
public class P1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息队列名字
        String queueName="queue";
        //实例连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置地址
        connectionFactory.setHost("192.168.128.233");
        //设置端口
        connectionFactory.setPort(5672);
        //设置用户名
        connectionFactory.setUsername("mowen");
        //设置密码
        connectionFactory.setPassword("123456");
        //获取连接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare(queueName,true,false,false,null);

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicPublish("",queueName,null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

运行后再浏览器进入RabbitMQ的控制台,切换到queue看到

二、获取队列消息(消费者)

新建一个C1类

package com.rabbitMQ.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @create 2019/11/20-13:12
 */
public class C1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //消息队列名字
        String queueName="queue";
        //实例连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置地址
        connectionFactory.setHost("192.168.128.233");
        //设置端口
        connectionFactory.setPort(5672);
        //设置用户名
        connectionFactory.setUsername("mowen");
        //设置密码
        connectionFactory.setPassword("123456");
        //获取连接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();

        // 创建一个消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消费收到消息的时候调用的回调
                System.out.println("C3接收到:" + new String(body));
            }
        };

        //把消费着绑定到指定队列
        //第一个是队列名
        //第二个是 是否自动确认
        //第三个是消费者
        channel.basicConsume(queueName,true,consumer);

    }
}

运行后输出为

消费者一般都不会关闭,会一直等待队列消息,可以手动关闭程序。

channel.basicConsume(queueName,true,consumer);中的true为收到消息后自动确认,改为false取消自动确认。

在handleDelivery方法最后面用

channel.basicAck(envelope.getDeliveryTag(),false);

来收到手动确认消息。消费者可以有多个并且可以同时消费一个队列;

当有多个消费者同时消费同一个队列时,收到的消息是平均分配的(消费者没收到之前已经确认每个消费者受到的消息),

但当其中一个消费者性能差的话,会影响其他的消费者,因为还要等它收完消息,这样会拖累其他消费者。

可以设置channel 的basicQos方法

//设置最多接受消息数量
// 设置了这个参数之后要吧自动确认关掉
channel.basicQos(1);

三、扇形(fanout)交换机

扇形交换机是基本的交换机类型,会把收到的消息以广播的形式发送到绑定的队列里,因为不需要经过条件筛选,所以它的速度最快。

在生产者项目新建一个fanout类

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class fanout {
    public static void main(String[] args) throws IOException, TimeoutException {
        //交换机名字
        String exchangeName="fanout";
        //交换机名字类型
        String exchangeType="fanout";
        //消息队列名字
        String queueName1="fanout.queue1";
        String queueName2="fanout.queue2";
        String queueName3="fanout.queue3";
        //实例连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置地址
        connectionFactory.setHost("192.168.128.233");
        //设置端口
        connectionFactory.setPort(5672);
        //设置用户名
        connectionFactory.setUsername("mowen");
        //设置密码
        connectionFactory.setPassword("123456");
        //获取连接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //声明交换机
        channel.exchangeDeclare(exchangeName,exchangeType);

        //队列绑定到交换机
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
        channel.queueBind(queueName3,exchangeName,"");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicPublish(exchangeName,"",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

运行后在RabbitMQ网页管理后台的queue会看到

切换到Exchanges会看到一个

就是我们声明的交换机,点击会看到我们绑定的队列

四、直连(direct)交换机

直连交换机会带路由功能,队列通过routing_key与直连交换机绑定,发送消息需要指定routing_key,交换机收到消息时,交换机会根据routing_key发送到指定队列里,同样的routing_key可以支持多个队列。

在生产者项目新建direct类

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class direct {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="direct";
        String exchangeType="direct";
        //消息队列名字
        String queueName1="direct.queue1";
        String queueName2="direct.queue2";
        String queueName3="direct.queue3";
        //实例连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置地址
        connectionFactory.setHost("192.168.128.233");
        //设置端口
        connectionFactory.setPort(5672);
        //设置用户名
        connectionFactory.setUsername("mowen");
        //设置密码
        connectionFactory.setPassword("123456");
        //获取连接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //声明交换机
        channel.exchangeDeclare(exchangeName,exchangeType);

        //队列绑定到交换机并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"key1");
        channel.queueBind(queueName2,exchangeName,"key2");
        channel.queueBind(queueName3,exchangeName,"key1");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

运行后到后台的queue会看到

切换到Exchanges会看到

点击进去

五、主题(topic)交换机

主题交换机的routing_key可以有一定的规则,交换机和队列的routing_key需要采用.#.…..的格式

每个部分用.分开

*代表一个单词(不是字符)

#代表任意数量(0或n个)单词

在生产者项目新进topic类

package com.rabbitMQ.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author mowen
 * @date  2019/11/20-11:23
 */
public class topic {
    public static void main(String[] args) throws IOException, TimeoutException {
        String exchangeName="topic";
        String exchangeType="topic";
        //消息队列名字
        String queueName1="topic.queue1";
        String queueName2="topic.queue2";
        String queueName3="topic.queue3";
        //实例连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //设置地址
        connectionFactory.setHost("192.168.128.233");
        //设置端口
        connectionFactory.setPort(5672);
        //设置用户名
        connectionFactory.setUsername("mowen");
        //设置密码
        connectionFactory.setPassword("123456");
        //获取连接(跟jdbc很像)
        Connection connection = connectionFactory.newConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueDeclare(queueName3,true,false,false,null);

        //声明交换机
        channel.exchangeDeclare(exchangeName,exchangeType);

        //队列绑定到交换机并指定rouing_key
        channel.queueBind(queueName1,exchangeName,"com.aaa.*");
        channel.queueBind(queueName2,exchangeName,"com.*.topic");
        channel.queueBind(queueName3,exchangeName,"com.bbb.*");

        for (int i = 0; i < 10; i++) {
            String msg="msg"+i;
            // 基本发布消息
            // 第一个参数为交换机名称、
            // 第二个参数为队列映射的路由key、
            // 第三个参数为消息的其他属性、
            // 第四个参数为发送信息的主体
            channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}

运行后,到后台queue会看到

切换到Exchanges会看到

点击进入会看到

原文地址:https://blog.51cto.com/14230003/2456647

时间: 2024-10-10 13:43:49

Java操作RabbitMQ添加队列、消费队列和三个交换机的相关文章

Rabbitmq中的优先级队列操作

1.%% 普通队列操作 in(X, 0, {queue, [_] = In, [], 1}) ->{queue, [X], In, 2}; in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> {queue, [X|In], Out, Len + 1}; 优先级队列操作: in(X, Priority, Q = {queue, [], [], 0}) -> in(X, Priority, {pqueue, []

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

JAVA多线程提高十二:阻塞队列应用

一.类相关属性 接口BlockingQueue<E>定义: public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

java面向对象的栈 队列 优先级队列的比较

栈 队列 有序队列数据结构的生命周期比那些数据库类型的结构(比如链表,树)要短得多.在程序操作执行期间他们才被创建,通常用他们去执行某项特殊的任务:当完成任务之后,他们就会被销毁.这三个数据结构还有一个特点就是访问是受到限制的,即在特定时刻只有一个数据项可以被读取或者被删除,但是所谓的移除并不是真的删除,数据项依然在这些数据结构中,只不过因为指针已经指向其他数据项,没有办法访问到,当添加新的数据项时,当初移除的数据项被替代从而永远消失. 栈 队列 优先级队列的模拟思想 1.栈:栈遵循先进后出(F

RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. Redis 是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库

RabbitMQ五:生产者--队列--多消费者

一.生成者-队列-多消费者(前言) 上篇文章,我们做了一个简单的Demo,一个生产者对应一个消费者,本篇文章就介绍 生产者-队列-多个消费者,下面简单示意图 P 生产者    C 消费者  中间队列 需求背景:工厂某部门需要生产n个零件,部门下面有2个小组,每个小组需要生产n/2个 公平派遣 每个小组的情况下,当所有奇怪的信息都很重,甚至信息很轻的时候,一个工作人员将不断忙碌,另一个工作人员几乎不会做任何工作.那么,RabbitMQ不知道什么,还会平均分配消息. 这是因为当消息进入队列时,Rab

RabbitMQ:伪延时队列

目录 一.什么是延时队列 二.RabbitMQ实现 三. 延时队列的问题 四.解决RabbitMQ的伪延时方案 ps:伪延时队列先卖个关子,我们先了解下延时队列. 一.什么是延时队列 所谓延时队列是指消息push到队列后,监听的消费者不能第一时间获取消息,需要等到指定时间才能消费. 一般在业务里面需要对某些消息做定时发送,不想走定时任务或者是用户下单之后多长时间自动失效类似的场景可以考虑通过延时队列实现. 二.RabbitMQ实现 MQ本身并不支持直接的延时队列实现,但是我们可以通过Rabbit

数据结构Java实现07----队列:顺序队列&amp;顺序循环队列、链式队列、顺序优先队列

数据结构Java实现07----队列:顺序队列&顺序循环队列.链式队列.顺序优先队列 一.队列的概念: 队列(简称作队,Queue)也是一种特殊的线性表,队列的数据元素以及数据元素间的逻辑关系和线性表完全相同,其差别是线性表允许在任意位置插入和删除,而队列只允许在其一端进行插入操作在其另一端进行删除操作. 队列中允许进行插入操作的一端称为队尾,允许进行删除操作的一端称为队头.队列的插入操作通常称作入队列,队列的删除操作通常称作出队列. 下图是一个依次向队列中插入数据元素a0,a1,...,an-