初识rabbitMQ(一)

19/5/29  对于rabbitMQ ,我已经研究了几天。 之前完全的没有接触过,所以有很多的概念,很多的坑要踩

首先是安装 rabbitmq 这个就不记录了。

1、引入 Maven

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>2.0.3.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --><dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.3.RELEASE</version></dependency>

2、配置 ,写配置文件
<!--步骤1、配置链接工厂--><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">    <property name="host" value="${mq.address}"/>    <property name="port" value="${mq.port}"/>    <property name="password" value="${mq.pwd}"/>    <property name="username" value="${mq.user}"/>    <property name="publisherConfirms" value="true"/>    <property name="publisherReturns" value="true"/>    <property name="virtualHost" value="${mq.vhost}"/>    <property name="requestedHeartBeat" value="50"/></bean><!--步骤2、创建rabbitTemplate 消息模板--><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">    <!--构造方法需要链接信息-->    <constructor-arg ref="connectionFactory"/>    <!--配置交换机-->    <property name="exchange" value="${mq.exchange}"/>    <!--配置路由键-->    <property name="routingKey" value="${mq.routingKey}"/>    <!--配置队列-->    <property name="queue" value="${mq.queue}"/>    <!--配置消息转换-->    <property name="messageConverter" ref="serializerMessageConverter"/>    <property name="confirmCallback" ref="rabbitTemplateConfig" />    <property name="returnCallback" ref="rabbitTemplateConfig" />    <property name="mandatory" value="true" /></bean><bean id="rabbitTemplateConfig" class="mq.RabbitTemplateConfig"/><!--注入消息转换器--><bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/><!--引入元素文件--><bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">    <property name="properties">        <bean class="org.springframework.beans.factory.config.PropertiesFactoryBean">            <property name="locations">                <list>                    <value>classpath:conf/value.properties</value>                </list>            </property>            <property name="fileEncoding" value="UTF-8"/>        </bean>    </property></bean><!--申明消费者--><bean id="rmqConsumer" class="mq.RmqConsumer" /><bean id="messageListenerAdapter"  class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">    <constructor-arg ref="rmqConsumer" />    <property name="defaultListenerMethod" value="rmqConsumeMessage"/>    <property name="messageConverter" ref="serializerMessageConverter"/></bean><!--注册监听--><bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory"/>    <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>    <property name="messageListener" ref="messageListenerAdapter"/>    <property name="acknowledgeMode" value="MANUAL"/></bean>

这个是我关于rabbitMQ 所用的配置,下面记录一下具体的作用。(1、)配置链接  通过配置链接工厂从而链接到rabbitMQ
<!--步骤1、配置链接工厂--><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">    <property name="host" value="${mq.address}"/>//链接的地址 127.0.0.1    <property name="port" value="${mq.port}"/>//端口号 5627    <property name="password" value="${mq.pwd}"/> //密码    <property name="username" value="${mq.user}"/> //用户名     <property name="publisherConfirms" value="true"/> //是否开启提交到交换机的回调    <property name="publisherReturns" value="true"/> //是否开启发送到队列的错误回调    <property name="virtualHost" value="${mq.vhost}"/>// 虚拟机    <property name="requestedHeartBeat" value="50"/>//心跳时间(这个可删除,我不知道有什么用,以后有领悟再记录)</bean>

属性文件中的内容
mq.address=127.0.0.1mq.exchange=ceshimq.routingKey=ceshiRoutingmq.queue=ceshiQueuesmq.port=5672mq.user=***mq.pwd=t**an****mq.timeout=5000mq.vhost=testMQ

关于开启 Confirm 与 Return 的回调 还需要在模板 rabbitTemplate 中进行设置 

<!--步骤2、创建rabbitTemplate 消息模板--><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">    <!--构造方法需要链接信息-->    <constructor-arg ref="connectionFactory"/>    <!--配置交换机-->    <property name="exchange" value="${mq.exchange}"/>    <!--配置路由键-->    <property name="routingKey" value="${mq.routingKey}"/>    <!--配置队列-->    <property name="queue" value="${mq.queue}"/>    <!--配置消息转换-->    <property name="messageConverter" ref="serializerMessageConverter"/>    <property name="confirmCallback" ref="rabbitTemplateConfig" />    <property name="returnCallback" ref="rabbitTemplateConfig" />    <property name="mandatory" value="true" /></bean>
注册模板类的bean 类 org.springframework.amqp.rabbit.core.RabbitTemplate  
在其构造方法中传入链接工厂的引用, 如上 代码  重点看 下面这几行配置 
    <property name="confirmCallback" ref="rabbitTemplateConfig" />    <property name="returnCallback" ref="rabbitTemplateConfig" />    <property name="mandatory" value="true" />
这个就是上面提到的 回调,<property name="mandatory" value="true" />  这个是一定要的 ,删除了会导致 returnCallback 不起效 ,下面贴上实现类代码
package mq;

import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;

/** * @author tia * @date 2019/5/2910:45 */public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    /**     * 是否成功发送到交换器     * 成功、失败都会回调     * @param correlationData     * @param b     * @param s     */    @Override    public void confirm(CorrelationData correlationData, boolean b, String s) {        System.out.println("消息唯一标识:"+correlationData);        System.out.println("确认结果:"+b);        System.out.println("失败原因:"+s);    }

    /**     * 是否成功发送到队列(需要设置mandatory 为true)     * 失败回调     * @param message     * @param i     * @param s     * @param s1     * @param s2     */    @Override    public void returnedMessage(Message message, int i, String s, String s1, String s2) {        System.out.println("消息主体:"+message);        System.out.println("消息主体:"+i);        System.out.println("描述:"+s);        System.out.println("交换器:"+s1);        System.out.println("路由键:"+s2);    }}
偷了个懒,把两个回调放在了一起 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback  这两个实现是一定要的。这两个方法的作用,就是对消息进行重新发送,或是记录没有发送出去的消息,等等,看个人安排了。

在我的配置中是没有关于 队列的创建,交换器的创建,虚拟机的创建、绑定等的内容, 这些都在RabbitMQ 的后台完成了 图个简单。

到这里,就可以向mq发送消息了。我写的一个例子:
package mq;

import com.alibaba.fastjson.JSON;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import po.Message;

import javax.annotation.Resource;import java.io.UnsupportedEncodingException;import java.util.Date;

@RestController@RequestMapping(value = "/mq",produces = "text/html;charset=UTF-8")public class RmqProducer {    private static final Logger LOGGER = LoggerFactory.getLogger(RmqProducer.class);    @Resource    private RabbitTemplate rabbitTemplate;

    /**     *     发送信息     */    public void sendMessage(String queueKey,Object msg) {        try {            // 发送信息            rabbitTemplate.convertAndSend(queueKey,"1");        } catch (Exception e) {            LOGGER.error("rmq消费者任务处理出现异常", e);        }    }    @RequestMapping("/sendMessage")    public void sendActiveCount(String activeMap) throws UnsupportedEncodingException {        Message message=new Message();        message.setFrom(1234566l);        message.setTo(754964641l);        message.setText("你妹妹好漂亮");        message.setDate(new Date());        message.setFromName("你妹妹");        String s = JSON.toJSONString(message);        for (int i = 0; i <100 ; i++) {            rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i));        }

    }}
主要的内容就是这个方法 rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); 哪儿都能发送。

再看看 消费者 怎么弄,可是花了我大量的时间 去弄这个。
<!--申明消费者--><bean id="rmqConsumer" class="mq.RmqConsumer" /><bean id="messageListenerAdapter"  class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">    <constructor-arg ref="rmqConsumer" />    <property name="defaultListenerMethod" value="rmqConsumeMessage"/>    <property name="messageConverter" ref="serializerMessageConverter"/></bean><!--注册监听--><bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory"/>    <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>    <property name="messageListener" ref="messageListenerAdapter"/>    <property name="acknowledgeMode" value="MANUAL"/></bean>
这个监听是一定要有的,或许你可以使用注解来干掉他。看到这个了吗? <property name="defaultListenerMethod" value="rmqConsumeMessage"/> 这个东西就是说默认去执行你   <constructor-arg ref="rmqConsumer" />   这个类的 这个 方法的。不过也有其他的弊端就是 通道的问题还有就是 如果实现了 implements ChannelAwareMessageListener 就不起效了。看代码:
package mq;

import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;;

public class RmqConsumer implements ChannelAwareMessageListener {    private static final Logger LOGGER = LoggerFactory.getLogger(RmqConsumer.class);    int  i=0;    @Override    public void onMessage(Message message, Channel channel) throws Exception {        try{            Object ddd=null;            JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody(),"utf-8"));            po.Message message1 = JSON.toJavaObject(jsonObject, po.Message.class);            System.out.println(message1.toString());            if(i++%10==0)            System.out.println(ddd.toString());            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }catch (Exception e){            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);            channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody());            System.out.println(e.getMessage());        }    }}
这个里面没有配置里提到的方法,他被我吃了。因为他不生效了。再说这个通道的问题 channel ,我这儿 消费者方法是不能抛出错误的,会停掉,所以只能处理, <property name="acknowledgeMode" value="MANUAL"/> 者个配置是在配置是否手动确认的。
MANUAL 手动确认 AUTO 自动确认(默认值) 如果开启自动确认,那么 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 与 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 将会 报错,也就是说,他不需要手动确认的代码存在。它会默认所有的方法都进行 成功确认,这个真的很无奈。 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 成功确认 他有两个参数 ,消息Tag 与是否批量确认。如果true 则批量确认 tag值小于该值的所有信息将被成功确认。 message.getMessageProperties().getDeliveryTag() 消息的Tag如果你开启了手动确认,但并没有确认,那么你的消息就会处于未确认状态,就像这样 Unacked 100 ,Total 100, 发送100条消息,都没有确认。那rabbitMQ不会把它删除,一直堆积在内存中,后果,就看你怎么处理了.....
 
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 失败确认 这个方法有三个参数。 消息Tag 、是否批量确认、和 是否重新回到队列。前两个参数跟 成功确认相同, 最后一个如果为true 将重新回到队列顶端注意 是队列顶端,下一次消费者就会调用返回队列的消息。如果这条消息有错误,那就意味着,程序会一直进行 失败确认 返回队列  ,死循环 。所以 看这个 channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); 发送消息,它会把消息发送到队列的末尾,这样最后执行,就可以避免不消费其他正确的消息了。
 


原文地址:https://www.cnblogs.com/hxz-nl/p/10945613.html

时间: 2024-11-02 18:52:18

初识rabbitMQ(一)的相关文章

初识RabbitMQ系列之一HelloWorld

Server端代码: 1 package com.helloworld; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 public class Send { 10 /** 发送目的地队列名称 */ 11 priva

初识RabbitMQ系列之一:简单介绍

一:RabbitMQ是什么? 众所周知,MQ是Message  Queue(消息队列)的意思,RabbitMQ就是众多MQ框架其中的一款,开源实现了AMQP协议(官网:http://www.amqp.org/),也就是说RabbitMQ是一个开源的消息队列框架. 他用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官网:http://www.rabbitmq.com 二:RabbitMQ优势&劣势? 优势: 1:安装部

初识RabbitMQ

1.安装 rabbitmq官网:http://www.rabbitmq.com/ 下载地址:https://packagecloud.io/rabbitmq 下载rabbitmq-server 安装脚本文件 # curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | bash 安装rabbitmq # yum install rabbitmq-server -y  

HelloWorld RabbitMQ

RabbitMQ入门-从HelloWorld开始 从读者的反馈谈RabbitMQ 昨天发完<RabbitMQ入门-初识RabbitMQ>,我陆陆续续收到一些反馈.鉴于部分读者希望结合实例来讲 期待下篇详细,最好结合案例.谢谢! 哪都好,唯一缺点就是不支持原生ha,配置起来太复杂 ... 上篇主要介绍了什么RabbitMQ,RabbitMQ能用来做什么,一些有关RabbitMQ的基本概念,同时还简单介绍了两种RabbitMQ的分发消息的模型.从这篇起,我们将改变原来的思路,针对每种模型详细讲解,

rabbitmq技术的一些感悟(一)

Rabbitmq 初识rabbitmq RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协议)的标准实现.假设不熟悉AMQP,直接看RabbitMQ的文档会比較困难.只是它也仅仅有几个关键概念,这里简介 几个概念说明: Broker:简单来说就是消息队列server实体. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列. Queue:消息队列载体,每一个消息都会被投入到一个或多个队列. Binding:绑定,它的作用就

rabbitmq技术

Rabbitmq 初识rabbitmq RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协议)的标准实现.如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难.不过它也只有几个关键概念,这里简单介绍 几个概念说明: Broker:简单来说就是消息队列服务器实体.Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列.Queue:消息队列载体,每个消息都会被投入到一个或多个队列.Binding:绑定,它的作用就是把exch

认识并安装RabbitMQ(以Windows系统为例)

一.初识RabbitMQ 百度百科有这么一句话: MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息, 而另一端则可以读取或者订阅队列中的消息. MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义, 而MQ则是遵循了AMQP协议的具体实现和产品. 他的官网中用大大的黑粗字体写着: RabbitMQ is the most widely deployed open source message broker. 由此可见,RabbitMQ是一个

全流程开发 GO实战电商网站高并发秒杀系统

获取资源点击这里:全流程开发 GO实战电商网站高并发秒杀系统 第1章 课程介绍[学前须知] 本章对这门课程进行说明,包括:秒杀系统涉及模块的介绍,秒杀核心的知识点的介绍,课程的学习规划等. 1-1 课程介绍试看 第2章 需求整理&系统设计 [明确需求] 本章对秒杀系统整体需求进行梳理,明确系统具体需求,讲解系统原型设计工具的使用,并结合秒杀系统进行整体架构设计. 2-1 需求分析 2-2 系统架构设计 2-3 [总结&扩展]需求整理&系统设计 2-4 [勤于思考,夯实学习成果]阶段

RabbitMQ学习之:(一)初识、概念及心得

因为网上有一篇很好的RMQ的入门帖子http://lostechies.com/derekgreer/tag/rabbitmq/,所以我就不多说了,就说说我目前看了该作者1~5章后,自己的心得.(所以要看懂我写的内容,需要看完那个作者写的1~5章.你可以先跳过这一篇,因为我后面的博文会依次分析该作者的文章的,等分析完1~5章,就可以回过头来看我写的这篇了) 首先,装好RMQ,且激活了Web管理服务之后(需要先暂停服务,然后重启,看帖子),就可以通过http://localhost:15672来访