RabbitMQ客户端开发向导

Ⅰ、高层接口

  • ConnectionFactory
  • Connection
  • Channel
  • Consumor

Ⅱ、操作流程及API

【一】创建连接工厂ConnectionFactory

ConnectionFactory factory = new ConnectionFactory();

? 我们可以为actory设置各种参数来进行连接初始化

factory.setUsername("guest");//设置服务器登录账号
factory.setPassword("guest");//设置服务器登录密码
factory.setHost("127.0.0.1");//设置服务器IP
factory.setPort("15427");//设置服务器端口
factory.setVirtualHost("/");//设置虚拟主机

【二】创建连接Connection

Connection connection = factory.newConnection();

【三】创建信道Channel

Channel channel = connection.createChannel();

知识点:

? 连接和信道的关系:连接是客户端与服务器开启的TCP连接,由于TCP连接的开启和销毁都需要消耗大量的性能,所以我们在连接的基础上使用了信道的概念,对连接进行逻辑再分,每个连接都可以创建多个信道,这些信道共用一个连接。

? 信道就是RabbitMQ中最实用的组件了,几乎所有针对交换器、队列、生产者、消费者的操作全部都在这里面执行。

【四】创建交换器Exchange

Exchange.DeclareOK exchangeDeclare(
    String exchange, // 声明交换器名称
    String type, // 声明交换器类型
    boolean durable, // 声明是否持久化
    boolean autoDelete, // 声明是否自动删除
    boolean internal, // 声明是否内置
    Map<String, Object> arguments // 创建交换器的其他参数
    ) throws IOException
Exchange.DeclareOK exchangeDeclare(
    String exchange, // 声明交换器名称
    BuiltinExchangeType type, // 声明交换器类型
    boolean durable, // 声明是否持久化
    boolean autoDelete, // 声明是否自动删除
    boolean internal, // 声明是否内置
    Map<String, Object> arguments // 创建交换器的其他参数
    ) throws IOException

? 创建交换器的方法有上面两种,这两种的区别仅在于声明交换器的类型type参数的类型不同,第一种常用。其实上面的方法每个都有很多个重载的方法,采用一些默认的参数。这里只列举参数最全的方法,来介绍其各个参数的意义。

知识点:交换器类型

? RabbitMQ的交换器拥有四种类型,分别为fanoutdirecttopicheadersfanout类型的交换器会将消息路由到所有与其绑定的队列中,类似于广播;direct类型是默认的交换器类型,它只会将消息路由到指定的队列中,这个队列的绑定key必须与消息的路由key完全一致;topic类型的交换器是最常使用的交换器类型,是一种模糊匹配的交换器,它会将消息路由到所有绑定key与消息路由key可匹配的队列上;至于headers类型的交换器并不常用,因为其性能较差,并不实用。

? 这里BuiltinExchangeType类型是一个枚举,它里面定义了这四种 类型的枚举值:FANOUTDIRECTTOPICHEADERS

知识点:交换器持久化

? 开启交换器的持久化,那么交换器在创建好之后会持久化到磁盘,一般对于生产环境中长期使用的交换器,最好开启持久化功能,用以提升RabbitMQ的可用性(高可用要点之一),服务器异常宕机的情况下可以硬件恢复。

知识点:交换器自动删除

? 交换器在不再使用的情况下是可以自动删除的,只要在创建交换器的时候设置autoDelete属性为true即可开启自动删除功能,默认为false

? 自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。

? 不再使用的交换器指的即使那些曾经绑定过队列或者交换器,现在已经没有任何绑定队列或者交换器的情况下的交换器。

知识点:内置交换器

? 内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。

? 交换器的创建可以在代码中实现,也可以不再代码中实现。推荐在代码中实现,因为当要创建的交换器已存在于RabbitMQ服务器中时,是不会再次创建的,而是直接返回创建成功。

【五】绑定交换器exchangeBind

Exchange.bindOK exchangeBind(
    String destination, // 指定目标交换器
    String source, // 指定源交换器
    String routingKey, // 指定绑定Key
    Map<String, Object> arguments // 其他一些结构化参数
    ) throws IOException

知识点:交换器绑定

? 交换器一般用来被队列绑定,但其实它也可以被另一个交换器绑定,绑定其实就是将二者关联起来,绑定两个交换器,就是将两个交换器关联起来,这里面有一个主动方,一个被动方,主动方是要执行绑定的交换器(目标交换器),被动方是被绑定的交换器(源交换器),绑定的过程其实就是将目标交换器的绑定key送给源交换器,这时其实可以将目标交换器看成是一个队列,将自己绑定到源交换器上,依靠的也是绑定key,不过这里的绑定key是没明确的路由Key,而非topic类似的模糊key。源交换器会将目标交换器当做一个队列进行看待,将接收到的路由key与目标交换器绑定key完全匹配消息路由到这个目标交换器中。

? 这个方法同样有重载的方法,来默认化一些参数。

【六】创建队列Queue

Queue.DeclareOK queueDeclare(
    String queue, // 声明队列名称
    boolean durable, // 声明是否持久化
    boolean exclusive, // 声明是否排他
    boolean autoDelete, // 声明是否自动删除
    Map<String, Object> arguments // 设置队列的其他一些参数
    ) throws IOException

知识点:队列持久化

? 创建队列的时候也可以设置是否支持持久化到磁盘,生产环境中我们一般都会将其设置为持久化,这也保证了服务器宕机的情况下重启后,队列可以恢复,以保证数据安全(高可用要点之二)

知识点:排他队列

? 创建队列的时候有一个排他参数exclusive,排他队列只对首次创建该队列的信道所在的连接可见,并且该连接内的所有信道都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。

? 这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。

? 非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。

知识点:队列自动删除

? 队列的自动删除类似于交换器的自动删除,都必须是曾经使用过的队列才能执行自动删除,如果是创建之后根本就没有用过的队列是不会触发自动删除的。

? 这个方法同样有重载的方法,来默认化一些参数。

【七】绑定队列queueBind

Queue.BindOK queueBind(
    String queue, // 指定队列名称
    String exchange, // 指定交换器名称
    String routingKey, // 声明绑定key
    Map<String,Object> arguments // 定义绑定的一些参数
    ) throws IOException

? 这里routingKey的值需要由指定的交换器的类型累决定使用什么形式的key,如果是fanout类型的交换器,会忽略routingKey的值,如果是direct类型的交换器,使用明确的绑定Key,如果是topic类型的交换器,则使用带有匹配符*#的模糊key。

【八】发布消息basicPublish

void basicPublish(
    String exchange, // 指定目的交换器
    String routingKey, // 声明消息的路由key
    boolean mandatory, // 是否为无法路由的消息进行返回处理
    boolean immediate, // 是否对路由到无消费者队列的消息进行返回处理
    BasicProperties props, // 消息的一些基本属性设置
    byte[] body // 消息体
    ) throws IOException

知识点:mandatory

? 消息发布的时候设置消息的mandatory属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为true表示将消息返回到生产者,否则直接丢弃消息。

? 上述无法路由的情况可以是在无法找到匹配消息路由key的队列,导致消息无法路由到队列中

? 当mandatory=true时,出现无法路由消息被返回,那么返回的消息又回到生产者,怎么接收呢,这就要靠返回监听器ReturnListener

知识点:ReturnListener

? 我们在设置消息的mandatory=true的时候,就需要对返回的消息进行处理了,我们可以在信道中添加返回监听器来监听返回的消息,一旦监听到这些消息,我们就着手对其进行再处理,一般我们会进行重发。

channel.addReturnListener(new ReturnListener(){
    @Override
    publish void handleReturn(
        int repayCode, // 回应码
        String repayText, // 回应内容
        String exchange, // 来源交换器
        String routingKey, // 消息的路由key
        AMQP.BasicProperties basicProperties, // 消息的其他属性
        byte[] body // 消息体
    ) throws IOException {
        // do some thing to handle the return message
    }
});

【九】消费消息basicConsumer、basicGet

1、推送消息basicConsume

String basicConsume(
    String queue, // 指定队列名称
    boolean autoAck, // 是否自动回应
    String consumerTag, // 声明消费者标签,区分不同的消费者
    boolean noLocal, // 是否不能将消息推送给同一个连接内的消费者
    boolean exclusive, // 是否排他
    Map<String, Object> arguments, // 设置消费者的其他参数
    Consumer callback // 设置消费者回调函数,处理消息
    ) throws IOException

知识点:自动回应

? 推送消息时设置消息自动回应,那么消息在推送给消费者后就会自动回应服务器,服务器收到回回应就会删除这条消息,这样无法保证消息能被正确处理,因为消费者虽然收到了消息,但它可能无法处理、或者拒绝等,这时服务器中消息却已被删除,导致消息丢失。(高可用要点之三)

? 一般情况下我们将其设置为false,然后在回调函数中消息处理完毕之后手动进行回应。

void basicAck(
    long deliveryTag, // 指定推送标识编号
    boolean multiple // 是否批量回应
    ) throws IOException

注意:推送标识

? deliveryTag表示的是推送编号,是一个单调递增正整数编号,它用来标识channel中一次消息推送,与消息绑在一起。

? 当一个消费者向服务器注册basicConsume之后,服务器就会使用basic.delivery给消费者推送消息,deliveryTag就用来标识这样一个推送。但要注意,它只在当前信道channel内有效。

? 消费者受到消息的时候同时会收到这个推送标识,当要回应、拒绝消息的时候就需要带着这个标识。

注意:批量回应

? multiple参数为boolean值,用来表示是否进行批量回应,当值为true时表示进行批量回应,它会对推送编号小于给定编号的所有消息进行回应。false表示只回应指定编号的消息。

知识点:推送范围

? noLocal这个参数表示是否可以将消息推送给与消息发布者同一连接内的消费者,如果noLocal=false表示可以推送,noLocal=true则表示不能推送,那么就只能推送到其他连接中的消费者。

知识点:排他

知识点:回调函数

? 回调函数主要用于定义针对消息的处理逻辑,一般采用如下方式定义:

Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(
        String consumerTag, // 消费者标签,可用于消费者验证发送目标的正确性
        Envelope envelope, // AMQP操作参数,可以获取其中参数进行消息处理
        Amqp.BasicProperties properties, // 消息的基本属性
        Byte[] body // 消息体,用于存放推送的消息
    ){
        // do something to handle deleveried message
    }
}

知识点:AMQP操作参数

? EnvelopeRabbitMQ定义的用于封装AMQP操作参数的类,里面主要封装了四个参数:

private final long _deliveryTag;// 推送编号
private final boolean _redeliver; // 是否重发标签
private final String _exchange; // 当前操作对应的交换器
private final String _routingKey; // 关联的路由Key

? 其中_redeliber=true表示这是一个失败的basicAck之后的消息的重新推送。这里面的这四个参数我们都可以在消费者客户端处理消息的时候使用。

? 其实basicConsume是消费者订阅方法,目的在于订阅某个队列,是消息推送的前提,真正的消息推送并没有在Channel类中实现,因为推送操作是由RabbitMQ服务器自动发起的,不需要生产者或者消费者手动触发,所以也就没用提供接口。

? 真正的消息推送是由RabbitMQ服务器的Basic.delivery方法实现的。

2、拉取消息basicGet

GetResponse basicGet(
    String queue, // 指定队列名称
    boolean autoAck // 是否自动回应
    ) throws IOException;

? 拉取消息就是指消费者主动从服务器获取消息,每次只能获取一条消息。推送消息是被动的获取。这里的autoAck和之前推送的设置一样,一般设置为false,表示不主动回应,采用手动回应(高可用要点之三)

【十】拒绝消息basicReject、basicNack

1、拒绝一个消息basicReject

void basicReject(
    long deliveryTag, // 消息推送编号
    boolean requeue // 是否重新入队
    ) throws IOException;

2、拒绝多个消息basicNack

void basicNack(
    long deliveryTag, // 消息推送编号
    boolean multiple, // 是否批量拒绝
    boolean requeue // 是否重新入队
    )throws IOException;

知识点:重新入队

? 当一个消息推送到某一个消费者,这个消费者无法处理时它进行了拒绝操作,如果指定requeue值为true,表示被拒绝的消息还可以重新发送到队列,可被继续推送到其他消费者,如果设置为false,那么这条消息会被立刻从队列删除。

? 如果将这两个方法的requeue参数设置为false,那么可以启用死信队列功能,因为这样的话,返回的消息会变成死信,如果服务器中设置有死信交换器DLX,并且已关联到该队列,那么这个消息就会被发送到死信交换器,从而被路由到绑定的死信队列中得以保留,我们可以通过排查这些消息来进行服务器优化。

知识点:批量拒绝

? 消息的单个拒绝与批量拒绝使用的不是同一个方法,批量拒绝的方法basicNack中有个决定是否批量拒绝的参数mutiple,如果设置为false,表示不执行批量拒绝,那么它的效果等同于basicReject方法,如果设置为true,表示拒绝小于指定推送编号的所有未被当前消费者消费的消息。

【十一】取消消费者basicCancel

void basicCancel(String consumerTag // 指定要取消的消费者标签
    ) throws IOException;

【十二】关闭连接

channel.close();// 关闭信道
connection.close();// 关闭TCP连接

原文地址:https://www.cnblogs.com/V1haoge/p/9642105.html

时间: 2024-10-15 12:21:27

RabbitMQ客户端开发向导的相关文章

RabbitMQ 客户端开发向导

准备工作:composer 引入 php-amqplib 说明:本文说明基于 Java(主要说明原理),实现使用 php RabbitMQ Java 客户端使用 com.rabbitmq.client 作为顶级包名,关键的 Class 和 Interface 有 Cahnnel.Connection.ConnectionFactory.Consumer 等.AMQP 协议层面的操作通过 Channel 接口实现.Connection 是用来开启 Channel(信道)的,可以注册事件处理器,也可

Unity3D技术之本地客户端开发入门

欢迎来到unity学习.unity培训.unity企业培训教育专区,这里有很多U3D资源.U3D培训视频.U3D教程.U3D常见问题.U3D项目源码,我们致力于打造业内unity3d培训.学习第一品牌. 本地客户端开发入门 本地客户端 (NaCl) 是 Google 提供的新技术,其允许您在 Web 页面嵌入本地可执行代码,以便您在无需安装插件的情况下部署性能非常强的 web 应用程序.目前,NaCl 仅支持在 Windows.Mac OS X 和 Linux(含可用的 Chrome 操作系统支

windows客户端开发调试工具

本文介绍windows常用开发与调试工具. 1.windows常用开发与调试工具 1.1 Sysinternals 内核大神打造,含大量windows系统工具,windows开发必备神器,大神被MS招安. 下载地址:http://technet.microsoft.com/en-us/sysinternals Procmon.exe 监视程序运行过程中的动作,可用于性能监控. procexp.exe 相当于升级版的任务管理器,可以查看加载模块,模块查找,线程列表(含CPU百分比), 创建dump

使用HTML客户端 开发的一些应用

使用HTML   客户端 开发的一些应用:   http://www.lightswitchextras.com/ 价格:   9.99 DALLAR 本人看中其首页(HOME)的设计. 使用HTML客户端 开发的一些应用,布布扣,bubuko.com

Yii2.0中文开发向导——高级应用程序模板

高级应用程序模板这个模板用在大型的团队开发项目中,而且后台从前台独立分离出来以便于部署在多个服务器中.由于YIi2.0的一些新的特性,这个程序模板的功能要更深一点.提供了基本的数据库的支持,注册.密码找回等功能.安装可以通过Composer来安装如果没有安装Composer,先安装 curl -s http://getcomposer.org/installer | php 然后用如下命令来获取 php composer.phar create-project --prefer-dist --s

第一次做网页客户端开发的体会

实验室组织了一个软件设计大赛,考虑了很久,决定选择网页客户端开发,又考虑了很久,决定写汽车网页.无意中想到网页可能会因为网速,网页内容,电脑配置等原因而加载慢,而用户面对的就是一个空白页面,无趣,不耐烦,这种等待可能不会长久,于是就想在页面加载完成之前加上一个等待页面,可以是有趣的文字,动态的图片等等,增强用户体验性.带着这个想法查阅了部分资料,也看到了很多优秀的网站,内心充满热情.事情不总是顺利的,这时才觉得,书到用时方很少,之前学习的脚本语言单一而简单,没有全局概念,做起来很费劲.写了一个简

Android应用开发-小巫CSDN博客客户端开发开篇

Android应用开发-小巫CSDN博客客户端开发开篇 2014年9月8日 八月十五 祝各位中秋节快乐 小巫断断续续花了几个星期的时间开发了这么一款应用--小巫CSDN博客,属于私人定制的这样的一款应用,整个客户端的数据全部来自本人博客,是通过爬取本人博客地址html页面,然后解析html把数据提取出来,整个客户端的技术难点主要是如何对html界面进行分析和使用Jsoup对html代码进行解析.目前本人的这款应用已经开发出来了,近段时间会提交应用商店进行审核,不久大家就可以看到这么一款逼格满满的

Yii2.0中文开发向导——控制器(Controller)

本节包含以下方面的内容 基本概念 路由 默认路由 动作的参数 在动作中定义参数 从请求(request)中获取参数 独立动作 动作过滤器(Action Filters) 捕获所有的请求 自定义响应类 控制器(Control)是应用程序中最关键的部分之一,它决定了如何处理传递进来的请求(Request),以及生成相应的响应(Response).大部分的控制器都会处理一个Http的请求,然后返回Html或者Json或者Xml格式的数据作为响应.1.基本概念控制器文件一般在应用程序的controlle

windows客户端开发--也许是一条不归路

如今的Windows客户端开发,已经被同行嘲笑为鸡肋,甚至有些人认识做Windows客户端就是一个笑柄. 食之无味,弃之可惜. 不可否认,PC端没落的很快. 但是想说的是,任何一门技术都有存在的道理. 微软就是所有Windows客户端开发人员的大腿,虽然这个大腿让人捉摸不定,主方向总是变化. 换言之,Windows客户端开发难度不小.如果你能轻松的驾驭指针.内存.类等等,即使有一个Windows客户端彻底完蛋了,你也许只用一个星期或是一个月就掌握了另一种编程语言开发. 重要的是思想~ 我个人认为