spring 集成rabbitmq

mq.properties

mq.host=主机ip
mq.username=admin
mq.password=admin123
mq.port=5672
mq.queue.vip=test-queue
mq.exchange=test-exchange
mq.vhost=test

spring-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<!-- RabbitMQ start -->
<!-- 连接配置 -->
<rabbit:connection-factory id="mqConnectionFactory" virtual-host="${mq.vhost}"
host="${mq.host}" username="${mq.username}" password="${mq.password}"
port="${mq.port}" />

<rabbit:admin connection-factory="mqConnectionFactory" />

<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="com.mq.util.FastJsonMessageConverter"></bean>

<!-- 消息队列客户端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.exchange}"
connection-factory="mqConnectionFactory" message-converter="jsonMessageConverter" />

<!-- queue 队列声明 -->
<!-- durable 是否持久化 exclusive 仅创建者可以使用的私有队列,断开后自动删除 auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
<rabbit:queue id="my_queue_vip" name="${mq.queue.vip}"
durable="true" auto-delete="false" exclusive="false" />

<!-- 交换机定义 -->
<!-- 交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。 如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。
direct模式:消息与一个特定的路由器完全匹配,才会转发 topic模式:按规则转发消息,最灵活 -->
<rabbit:direct-exchange name="${mq.exchange}"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="my_queue_vip" key="vip_key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 配置监听 消费者 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="my_queue" ref="rabbitmqService" />
</rabbit:listener-container>

</beans>

FastJsonMessageConverter

package com.pptv.ucm.mq.util;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.alibaba.fastjson.JSON;

/**
 * @类名:FastJsonMessageConverter .
 * @描述: *****  .
 * @作者: yakunMeng .
 * @创建时间: 2017年8月11日 上午10:08:00 .
 * @版本号: V1.0 .
 */
public class FastJsonMessageConverter extends AbstractJsonMessageConverter {

    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    private static ClassMapper classMapper = new DefaultClassMapper();

    public FastJsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(), encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException("Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type [" + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
            throws UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return JSON.parseObject(contentAsString, clazz);
    }
}

RabbitmqService

package com.pptv.ucm.service.impl;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import com.pptv.ucm.common.util.ImportExecl;
import com.pptv.ucm.common.util.StringUtil;

import net.sf.json.JSONObject;

/**
 * rabbitmq监听消息,消费者
 *
 * @author st
 *
 */
public class RabbitmqService implements MessageListener {
    private static Logger log = LoggerFactory.getLogger(RabbitmqService.class);

    public void onMessage(Message message) {
        log.info("消息消费者 = " + message);
        String content = null;
        try {
            content = new String(message.getBody(), "utf-8");
            if (StringUtil.isNotBlank(content)) {
                JSONObject object = JSONObject.fromObject(content.toString());
                String batch_code = !object.has("batch_code") ? "" : object.getString("batch_code");
                String file_url = !object.has("file_url") ? "" : object.getString("file_url");
                log.info("batch_code=" + batch_code + "file_url=" + file_url);
                String localExcelPath = ImportExecl.getDiskPath(file_url);
                System.out.println(localExcelPath);
                ImportExecl poi = new ImportExecl();
                List<List<String>> list = poi.read(localExcelPath);
                if (list != null) {
                    for (int i = 0; i < list.size(); i++) {
                        System.out.print("第" + (i) + "行");
                        List<String> cellList = list.get(i);
                        for (int j = 0; j < cellList.size(); j++) {
                            System.out.print(" " + cellList.get(j));
                        }
                        System.out.println();
                    }
                }
                new File(localExcelPath).delete();//删除本地文件
            }
        } catch (Exception e) {
            log.info("报错了e= " + e.getMessage());
            e.printStackTrace();
        }
        log.info(content);
    }

}

MQProducerImpl

package com.pptv.ucm.service.impl;

import javax.annotation.Resource;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.pptv.ucm.service.IMQProducer;

@Service
public class MQProducerImpl implements IMQProducer {
    @Value(value = "${mq.queue}")
    private String queueId;

    @Value(value = "${mq.exchange}")
    private String mqExchange;

    @Value(value = "${mq.patt}")
    private String mqPatt;

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendQueue(Object object) {
        // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange
        amqpTemplate.convertAndSend(mqExchange, mqPatt, object);
    }
}
时间: 2024-10-12 21:16:25

spring 集成rabbitmq的相关文章

rabbitMQ第五篇:Spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq. 首先引入配置文件org.springframework.amqp,如下 <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> <

RabbitMQ第四篇:Spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq. 首先引入配置文件org.springframework.amqp,如下 <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> <

spring集成RabbitMQ配置文件详解(生产者和消费者)

1,首先引入配置文件org.springframework.amqp,如下: <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> 2,准备工作:安装好rabbitmq,并在项目中增

Spring集成rabbitmq

一.AMQP(Advanced Message Queuing Protocol) 提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,是面向消息的中间件设计.基于此协议的客户端与消息中间件可以传递消息,并不受客户端/中间件类型.开发语言等条件的限制. RabbitMQ是基于Erlang Virtual Runtime的AMQP的实现.因此可以跨语言的实现消息传递.(TODO:实现javascript与java的消息交互) 二.一些概念 1.AMQP是异步消息协议,关于消息

RabbitMQ及Spring集成

部分转载自https://blog.csdn.net/whoamiyang/article/details/54954780 1.背景 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. 2.应用场景 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,

RabbitMQ安装和使用(和Spring集成)

一.安装Rabbit MQ Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang.通过下面两个连接下载安装3.2.3 版本: 下载并安装 Eralng OTP For Windows (vR16B03) 运行安装 Rabbit MQ Server Windows Installer (v3.2.3) 具体操作步骤参考:在 Windows 上安装Rabbit MQ 指南 本人遇到的问题 当安装RabbitMQ后,使用rabbitmqctl

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

消息中间件系列四:RabbitMQ与Spring集成

一.RabbitMQ与Spring集成  准备工作: 分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程 在pom.xml文件里面引入如下依赖: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati

Spring Boot系列——7步集成RabbitMQ

RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步.削峰的目的. 今天这篇,我们来看看Spring Boot是如何集成RabbitMQ,发送消息和消费消息的.同时我们介绍下死信队列. 集成RabbitMQ 集成RabbitMQ只需要如下几步即可 1.添加maven依赖 <!--rabbitmq--> <dependency> ? ? <groupId>org.springframework.boot</groupId>