RabbitMq 实战(一)

RabbitMq消息消费者服务 

开发工具Idea和Spring boot来开发的。

消息消费目前只是一个简单的Demo,后续会处理成更智能一些。

首先配置文件类,RabbitMqConfig,里面配置一些用户名和密码嗨哟队列信息。

package com.basic.rabbitmq.consumer.config;

import com.basic.rabbitmq.consumer.listener.HandleMessageListenerAdapter;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

/**
 * Rabbitmq配置类
 * Created by sdc on 2017/7/4.
 */
@Configuration
@ComponentScan(basePackages = {"com.basic"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMqConfig {

    @Autowired
    private Environment env;

    /**
     * 构建connectionfactory
     * @return
     * @throws Exception
     */
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
        connectionFactory.setPort(Integer.valueOf("5672".trim()));
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
        connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
        return connectionFactory;
    }

    /**
     * CachingConnectionFactory
     * @return
     * @throws Exception
     */
    @Bean
    public CachingConnectionFactory cachingConnectionFactory() throws Exception {
        return new CachingConnectionFactory(connectionFactory());
    }

    /**
     * RabbitTemplate,类似于jdbctemplate一样的工具类
     * @return
     * @throws Exception
     */
    @Bean
    public RabbitTemplate rabbitTemplate() throws  Exception {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin() throws  Exception {
        return new RabbitAdmin(cachingConnectionFactory());
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer(
            @Qualifier("handleMessageListenerAdapter") HandleMessageListenerAdapter handleMessageListenerAdapter) throws Exception {
        //队列名字
        String queueName = env.getProperty("emial.server.queue").trim();

        //单一的消息监听容器
        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(cachingConnectionFactory());
        simpleMessageListenerContainer.setQueueNames(queueName);
        simpleMessageListenerContainer.setMessageListener(handleMessageListenerAdapter);
        //手动设置 ACK,就是成功消费信息了,就设置一下这个,rabbitmq就从此队列里删除这条信息了。
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return simpleMessageListenerContainer;
    }

}

我这里配置了一个SimpleMessageListenerContainer,这个Bean,用来监听队列里的消息的。

具体的

package com.basic.rabbitmq.consumer.listener;

        import com.rabbitmq.client.Channel;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.annotation.ComponentScan;
        import org.springframework.mail.MailMessage;
        import org.springframework.mail.javamail.JavaMailSender;
        import org.springframework.stereotype.Component;

        import javax.annotation.Resource;

/**
 * 监听消息的处理适配器
 * Created by sdc on 2017/7/10.
 */
@Component("handleMessageListenerAdapter")
public class HandleMessageListenerAdapter extends MessageListenerAdapter {

//    @Resource
//    private JavaMailSender mailSender;

    /**
     * 这块和activemq那个监听器差不多,都是监听信息,也都是onMessage方法。
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String messageDetail = new String(message.getBody()); //消息体
        System.out.println("消息消费:" + messageDetail);

        // 手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

还有一些配制文件,请看

http://10103778.blog.51cto.com/10093778/1945756

这个博客,就可以看到具体的配制了。

启动这个项目,就可以从队列消费消息了。消费者还是比较简单的,对应到相应的队列就可以处理了消息了。

时间: 2024-08-03 15:32:26

RabbitMq 实战(一)的相关文章

rabbitMQ实战(一)---------使用pika库实现hello world

rabbitMQ实战(一)---------使用pika库实现hello world 2016-05-18 23:29 本站整理 浏览(267) pika是RabbitMQ团队编写的官方Python AMQP库.需要先安装pika:pip3 install pika有较详细的注释,就不再详细说明了生产者代码:hello_world_producer.py: import pika,sys #connect to the rabbitmq,use the default vhost credent

RabbitMQ实战:理解消息通信

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴.另外,短暂的陪产假就要结束了,小宝也二周了,下周二就要投入工作了,希望自己尽快调整过来,加油努力. 从本篇开始总结「RabbitMQ实战」系列的阅读笔记,RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理. 本篇

RabbitMQ实战:消息通信模式和最佳实践

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 通过前2篇的介绍,了解了消息通信的主要元素和交互过程,以及如何运行和管理RabbitMQ,这篇将站在开发模式的角度理解「面向消息通信」带来的好处,以及在各种场景下的最佳实践. 通过介绍,你会了解到: 面向消息通信的好处 发后即忘模型 用RabbitMQ实现RPC 面向消息通信的好处 主要从异步状态思维.处理能力扩展性.集成复杂度方面,说明面向消息通信的好处. 异步状态思维 当将消息通信集成到应用程序时,开发模式将从同步模型

RabbitMQ实战:可用性分析和实现

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 上一篇介绍了各种场景下的最佳实践,大部分场景可以使用「发后即忘」的模式,不需要响应,如果需要响应,可以使用RabbitMQ的RPC模型. RabbitMQ以异步的方式解耦系统间的关系,调用者将业务请求发送到Rabbit服务器,就可以返回了,Rabbit会确保请求被正确处理,即使遇到网络异常.Rabbit服务器崩溃.整个机房断电等特殊场景,针对这些场景,Rabbit提供了各种机制确保其可用性. 本篇通过总结可能出现的特殊场景

RabbitMQ实战:界面管理和监控

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 上一篇总结了可能出现的异常场景,并对RabbitMQ提供的可用性保证进行了分析,在出现服务器宕机后,仍然可以正常服务.另外,需要尽快恢复异常的服务器,重新加入集群,推送未消费的消息,通过监控可第一时间接收到错误并进行处理. 另外,我们想主动了解消息堆积和消费的情况,以及服务器节点的压力,RabbitMQ提供了几种方式便捷.直观的了解,包括Web管理插件.REST API.rabbitmqadmin脚本. 通过介绍,你会了解

Java SpringBoot集成RabbitMq实战和总结

目录 交换器.队列.绑定的声明 关于消息序列化 同一个队列多消费类型 注解将消息和消息头注入消费者方法 关于消费者确认 关于发送者确认模式 消费消息.死信队列和RetryTemplate RPC模式的消息(不常用) 关于消费模型 关于RabbitMq客户端的线程模型 在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着

RabbitMQ实战应用技巧

1. RabbitMQ实战应用技巧 1.1. 前言 由于项目原因,之后会和RabbitMQ比较多的打交道,所以让我们来好好整理下RabbitMQ的应用实战技巧,尽量避免日后的采坑 1.2. 概述 RabbitMQ有几个重要的概念:虚拟主机,交换机,队列和绑定 虚拟主机:一个虚拟主机持有一组交换机.队列和绑定,我们可以从虚拟主机层面的颗粒度进行权限控制 交换机:Exchange用于转发消息,它并不存储消息,如果没有Queue队列绑定到Exchange,它会直接丢弃掉生产者发来的数据. 交换机还有个

《RabbitMQ 实战》 -- 网易云课堂

安装 RabbitMQ解决的问题     实例 RabbitMQ通信信道 交换器类型 以上 原文地址:https://www.cnblogs.com/LearnFromNow/p/10011556.html

rabbitMq实战使用

只做下工作记录,比较重要的几个属性: concurrency:一个生产者可以同时由多少个消费者消费,这个一般根据你的机器性能来进行配置 prefetch:允许为每个consumer指定最大的unacked messages数目.要是对实时性要求很高的话,prefetch应该设置成1,concurrency的值调高点 队列中Ready状态和Unacknowledged状态的消息数,分别指的是等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数. 注意配置超时重连机制,防止死机 s