19/5/29 对于rabbitMQ ,我已经研究了几天。 之前完全的没有接触过,所以有很多的概念,很多的坑要踩
首先是安装 rabbitmq 这个就不记录了。
1、引入 Maven
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>2.0.3.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --><dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.3.RELEASE</version></dependency> 2、配置 ,写配置文件
<!--步骤1、配置链接工厂--><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="host" value="${mq.address}"/> <property name="port" value="${mq.port}"/> <property name="password" value="${mq.pwd}"/> <property name="username" value="${mq.user}"/> <property name="publisherConfirms" value="true"/> <property name="publisherReturns" value="true"/> <property name="virtualHost" value="${mq.vhost}"/> <property name="requestedHeartBeat" value="50"/></bean><!--步骤2、创建rabbitTemplate 消息模板--><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <!--构造方法需要链接信息--> <constructor-arg ref="connectionFactory"/> <!--配置交换机--> <property name="exchange" value="${mq.exchange}"/> <!--配置路由键--> <property name="routingKey" value="${mq.routingKey}"/> <!--配置队列--> <property name="queue" value="${mq.queue}"/> <!--配置消息转换--> <property name="messageConverter" ref="serializerMessageConverter"/> <property name="confirmCallback" ref="rabbitTemplateConfig" /> <property name="returnCallback" ref="rabbitTemplateConfig" /> <property name="mandatory" value="true" /></bean><bean id="rabbitTemplateConfig" class="mq.RabbitTemplateConfig"/><!--注入消息转换器--><bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/><!--引入元素文件--><bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="properties"> <bean class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:conf/value.properties</value> </list> </property> <property name="fileEncoding" value="UTF-8"/> </bean> </property></bean><!--申明消费者--><bean id="rmqConsumer" class="mq.RmqConsumer" /><bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="rmqConsumer" /> <property name="defaultListenerMethod" value="rmqConsumeMessage"/> <property name="messageConverter" ref="serializerMessageConverter"/></bean><!--注册监听--><bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/> <property name="messageListener" ref="messageListenerAdapter"/> <property name="acknowledgeMode" value="MANUAL"/></bean> 这个是我关于rabbitMQ 所用的配置,下面记录一下具体的作用。(1、)配置链接 通过配置链接工厂从而链接到rabbitMQ
<!--步骤1、配置链接工厂--><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="host" value="${mq.address}"/>//链接的地址 127.0.0.1 <property name="port" value="${mq.port}"/>//端口号 5627 <property name="password" value="${mq.pwd}"/> //密码 <property name="username" value="${mq.user}"/> //用户名 <property name="publisherConfirms" value="true"/> //是否开启提交到交换机的回调 <property name="publisherReturns" value="true"/> //是否开启发送到队列的错误回调 <property name="virtualHost" value="${mq.vhost}"/>// 虚拟机 <property name="requestedHeartBeat" value="50"/>//心跳时间(这个可删除,我不知道有什么用,以后有领悟再记录)</bean> 属性文件中的内容
mq.address=127.0.0.1mq.exchange=ceshimq.routingKey=ceshiRoutingmq.queue=ceshiQueuesmq.port=5672mq.user=***mq.pwd=t**an****mq.timeout=5000mq.vhost=testMQ 关于开启 Confirm 与 Return 的回调 还需要在模板 rabbitTemplate 中进行设置 <!--步骤2、创建rabbitTemplate 消息模板--><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <!--构造方法需要链接信息--> <constructor-arg ref="connectionFactory"/> <!--配置交换机--> <property name="exchange" value="${mq.exchange}"/> <!--配置路由键--> <property name="routingKey" value="${mq.routingKey}"/> <!--配置队列--> <property name="queue" value="${mq.queue}"/> <!--配置消息转换--> <property name="messageConverter" ref="serializerMessageConverter"/> <property name="confirmCallback" ref="rabbitTemplateConfig" /> <property name="returnCallback" ref="rabbitTemplateConfig" /> <property name="mandatory" value="true" /></bean>
注册模板类的bean 类 org.springframework.amqp.rabbit.core.RabbitTemplate
在其构造方法中传入链接工厂的引用, 如上 代码 重点看 下面这几行配置
<property name="confirmCallback" ref="rabbitTemplateConfig" /> <property name="returnCallback" ref="rabbitTemplateConfig" /> <property name="mandatory" value="true" />
这个就是上面提到的 回调,<property name="mandatory" value="true" /> 这个是一定要的 ,删除了会导致 returnCallback 不起效 ,下面贴上实现类代码
package mq; import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData; /** * @author tia * @date 2019/5/2910:45 */public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { /** * 是否成功发送到交换器 * 成功、失败都会回调 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("消息唯一标识:"+correlationData); System.out.println("确认结果:"+b); System.out.println("失败原因:"+s); } /** * 是否成功发送到队列(需要设置mandatory 为true) * 失败回调 * @param message * @param i * @param s * @param s1 * @param s2 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("消息主体:"+message); System.out.println("消息主体:"+i); System.out.println("描述:"+s); System.out.println("交换器:"+s1); System.out.println("路由键:"+s2); }}
偷了个懒,把两个回调放在了一起 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback 这两个实现是一定要的。这两个方法的作用,就是对消息进行重新发送,或是记录没有发送出去的消息,等等,看个人安排了。 在我的配置中是没有关于 队列的创建,交换器的创建,虚拟机的创建、绑定等的内容, 这些都在RabbitMQ 的后台完成了 图个简单。 到这里,就可以向mq发送消息了。我写的一个例子:
package mq; import com.alibaba.fastjson.JSON;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import po.Message; import javax.annotation.Resource;import java.io.UnsupportedEncodingException;import java.util.Date; @RestController@RequestMapping(value = "/mq",produces = "text/html;charset=UTF-8")public class RmqProducer { private static final Logger LOGGER = LoggerFactory.getLogger(RmqProducer.class); @Resource private RabbitTemplate rabbitTemplate; /** * 发送信息 */ public void sendMessage(String queueKey,Object msg) { try { // 发送信息 rabbitTemplate.convertAndSend(queueKey,"1"); } catch (Exception e) { LOGGER.error("rmq消费者任务处理出现异常", e); } } @RequestMapping("/sendMessage") public void sendActiveCount(String activeMap) throws UnsupportedEncodingException { Message message=new Message(); message.setFrom(1234566l); message.setTo(754964641l); message.setText("你妹妹好漂亮"); message.setDate(new Date()); message.setFromName("你妹妹"); String s = JSON.toJSONString(message); for (int i = 0; i <100 ; i++) { rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); } }}
主要的内容就是这个方法 rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); 哪儿都能发送。 再看看 消费者 怎么弄,可是花了我大量的时间 去弄这个。
<!--申明消费者--><bean id="rmqConsumer" class="mq.RmqConsumer" /><bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="rmqConsumer" /> <property name="defaultListenerMethod" value="rmqConsumeMessage"/> <property name="messageConverter" ref="serializerMessageConverter"/></bean><!--注册监听--><bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/> <property name="messageListener" ref="messageListenerAdapter"/> <property name="acknowledgeMode" value="MANUAL"/></bean>
这个监听是一定要有的,或许你可以使用注解来干掉他。看到这个了吗? <property name="defaultListenerMethod" value="rmqConsumeMessage"/> 这个东西就是说默认去执行你 <constructor-arg ref="rmqConsumer" /> 这个类的 这个 方法的。不过也有其他的弊端就是 通道的问题还有就是 如果实现了 implements ChannelAwareMessageListener 就不起效了。看代码:
package mq; import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.rabbitmq.client.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;; public class RmqConsumer implements ChannelAwareMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(RmqConsumer.class); int i=0; @Override public void onMessage(Message message, Channel channel) throws Exception { try{ Object ddd=null; JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody(),"utf-8")); po.Message message1 = JSON.toJavaObject(jsonObject, po.Message.class); System.out.println(message1.toString()); if(i++%10==0) System.out.println(ddd.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (Exception e){ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); System.out.println(e.getMessage()); } }}
这个里面没有配置里提到的方法,他被我吃了。因为他不生效了。再说这个通道的问题 channel ,我这儿 消费者方法是不能抛出错误的,会停掉,所以只能处理, <property name="acknowledgeMode" value="MANUAL"/> 者个配置是在配置是否手动确认的。
MANUAL 手动确认 AUTO 自动确认(默认值) 如果开启自动确认,那么 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 与 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 将会 报错,也就是说,他不需要手动确认的代码存在。它会默认所有的方法都进行 成功确认,这个真的很无奈。 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 成功确认 他有两个参数 ,消息Tag 与是否批量确认。如果true 则批量确认 tag值小于该值的所有信息将被成功确认。 message.getMessageProperties().getDeliveryTag() 消息的Tag如果你开启了手动确认,但并没有确认,那么你的消息就会处于未确认状态,就像这样 Unacked 100 ,Total 100, 发送100条消息,都没有确认。那rabbitMQ不会把它删除,一直堆积在内存中,后果,就看你怎么处理了.....
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 失败确认 这个方法有三个参数。 消息Tag 、是否批量确认、和 是否重新回到队列。前两个参数跟 成功确认相同, 最后一个如果为true 将重新回到队列顶端注意 是队列顶端,下一次消费者就会调用返回队列的消息。如果这条消息有错误,那就意味着,程序会一直进行 失败确认 返回队列 ,死循环 。所以 看这个 channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); 发送消息,它会把消息发送到队列的末尾,这样最后执行,就可以避免不消费其他正确的消息了。
原文地址:https://www.cnblogs.com/hxz-nl/p/10945613.html
时间: 2024-11-02 18:52:18