rabbitMq创建和获取消息

package com.yunda.inter.preload.contextinit;

import net.sf.json.JSONObject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.yunda.inter.shipmentAcceptor.bean.ShipmentData;
import com.yunda.inter.shipmentCheck.service.ShipmentCheckService;
import com.yunda.inter.sign.service.SignService;
import com.yunda.inter.util.CommUtil;
import com.yunda.inter.util.QueueUtil;
import com.yunda.inter.util.StringUtil;

/**
 * 启动预加载信息类
 *@author Administrator
 */
public class ContextLoaderSpringListener implements ApplicationListener<ContextRefreshedEvent>{

    private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class);
    @Autowired
    private ShipmentCheckService shipmentCheckService;

    //当spring容器初始化完成后就会执行该方法。
    public void onApplicationEvent(ContextRefreshedEvent event) {
        logger.debug("ConfigLoadListener init......");
        try {
            //创建一个频道
            Channel channel = QueueUtil.getConnection().createChannel();
            boolean durable = true;
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null);

            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            //TODO:并发测试MQ,ack?
            channel.basicConsume(QueueUtil.getQueueName(), false/*打开应答机制*/, consumer);
            while (true) {
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                byte[] body = delivery.getBody();
                try {
                    String str=new String(body,"UTF-8");
                    JSONObject j = JSONObject.fromObject(str);
                    String shipmentId = j.getString("shipmentId");
                    String vehicleId = j.getString("vehicleId");
                    int planLineType = j.getInt("planLineType");

                    shipmentCheckService.check(shipmentId,vehicleId,planLineType);
                } catch (RuntimeException e) {
                    logger.error("货运单数据校验出现异常:", e);
                    logger.error("Source package:"+ CommUtil.getEncodeData(body));
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            logger.error("货运单存储器出现异常:", e);
        }
    }

}
    private void storeInQueue(byte[] dst) throws IOException, TimeoutException {
        Channel channel = QueueUtil.getConnection().createChannel();
        channel.queueDeclare(QueueUtil.getQueueName(), /*持久存储*/false, false, false, null);
        channel.basicPublish("", QueueUtil.getQueueName(), null, dst);
        channel.close();
    }
时间: 2024-10-30 07:37:03

rabbitMq创建和获取消息的相关文章

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599 rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析. 编程模型伪代码如下: ConnectionFactory

RabbitMQ介绍2 - 理解消息AMQP

理解消息AMQP通信.官方解释: http://www.rabbitmq.com/tutorials/amqp-concepts.html 概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key. producer发布消息,消息经过交换器传播放入队列,消费者从队列中得到消息. ConnectionFactory, connection, channel信道.connectionFactory用来建立

rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉. 这时候就可以对消息进行过滤了. 在消费者端设置好需要接收的消息类型. 如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型. channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 示例

rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者   一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer

RabbitMQ实战:理解消息通信

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴.另外,短暂的陪产假就要结束了,小宝也二周了,下周二就要投入工作了,希望自己尽快调整过来,加油努力. 从本篇开始总结「RabbitMQ实战」系列的阅读笔记,RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理. 本篇

【RabbitMQ】如何进行消息可靠投递【上篇】

说明 前几天,突然发生线上报警,钉钉连发了好几条消息,一看是RabbitMQ相关的消息,心头一紧,难道翻车了? [橙色报警]?应用[xxx]在[08-15?16:36:04]发生[错误日志异常],alertId=[xxx].由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发. 应用xxx?可能原因如下 服务名为: ?异常为:org.springframework.amqp.rabbit.lis

Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比 ? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

Rabbit mq订阅方式获取消息并可设置持久化

Rabbit 通过方式获取消息:订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去. 可以通过 channel.basicQos(1); 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对

运用Java获取消息摘要

消息摘要简单介绍 消息摘要算法的主要特征是加密过程不需要密钥,并且经过加密的数据无法被解密,只有输入相同的明文数据经过相同的消息摘要算法才能得到相同的密文.消息摘要算法不存在密钥的管理与分发问题,适合于分布式网络相同上使用.由于其加密计算的工作量相当可观,所以以前的这种算法通常只用于数据量有限的情况下的加密,例如计算机的口令就是用不可逆加密算法加密的.近年来,随着计算机相同性能的飞速改善,加密速度不再成为限制这种加密技术发展的桎梏,因而消息摘要算法应用的领域不断增加.现在,消息摘要算法主要应用在