Rabbit mq订阅方式获取消息并可设置持久化

Rabbit 通过方式获取消息:订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。

可以通过

channel.basicQos(1); 

设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:

整理代码如下:

Produce

public class RabbitMQProduce {
	public static void main(String[] args) throws IOException, InterruptedException {
		ConnectionFactory factory =new ConnectionFactory();
		String routingKey="test";
		String exchange="test";
	    factory.setHost("localhost");
	    Connection conn = factory.newConnection();
	    Channel channel =conn.createChannel();

	    //发送消息
	    for(int i=0;i<8000;i++){
	    	if(i%5==0){
	    		Thread.sleep(200);
	    	}
	    	byte[] messageBodyBytes =(i+"").getBytes();
	    	//如果将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
	    	//也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存在
	    	//消费者在声明时也要做持久化声明
		    channel.basicPublish(exchange, routingKey, null, messageBodyBytes);
		    System.out.println("发送.."+i);
	    }
	    channel.close();
	    conn.close();
	}
}

Customer

public class RabbitMqCustomer {
	private static ConnectionFactory factory;
	private static String QueryName="test";
	private static Connection conn;
	private static Channel channel;
	private static String exchange="test";
	private static String routingKey="test";
	public static void main(String[] args) throws Exception {
		start();
		/**
		 * 采用订阅的方式获取消息
		 */
        channel.basicConsume(QueryName, false, new DefaultConsumer(channel){
        	@Override
        	public void handleShutdownSignal(String consumerTag,
        			ShutdownSignalException sig) {
        	System.out.println("==="+consumerTag+"====="+sig.getMessage());
        	boolean isOpenConnect = conn!=null&&conn.isOpen();
        	boolean isOpenChannel = channel != null && channel.isOpen();
        	while(!isOpenChannel||!isOpenConnect){
        		try {
        			System.out.println("连接失败重连接....");
					start();
					Thread.sleep(3000);
				} catch (Exception e) {
					e.printStackTrace();
				}
        	  }
        	}

        	@Override
        	public void handleDelivery(String consumerTag, Envelope envelope,
        			BasicProperties properties, byte[] body) throws IOException {
        		//消息序号
        		long deliveryTag = envelope.getDeliveryTag();
        		String mes = new String(body,"UTF-8");
        		System.out.println("接受到消息:"+mes);
        		//确认收到,消息回执
        		channel.basicAck(deliveryTag, true);
        	}
        });
	}

	public static  void start() throws IOException {
		factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setUsername("test");
		factory.setPassword("test");
		conn = factory.newConnection();
		channel = conn.createChannel();
		channel.exchangeDeclare(exchange, "topic");
		channel.queueDeclare(QueryName, false, false, false, null);//声明消息队列,且为可持久化的
		channel.queueBind(QueryName, exchange, routingKey);
		channel.basicQos(1); //消息分发处理
	}
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-28 14:32:04

Rabbit mq订阅方式获取消息并可设置持久化的相关文章

在 Windows 上安装Rabbit MQ 指南

转载自张善友博客园:http://www.cnblogs.com/shanyou/p/4067250.html rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. RabbitMQ的官方站:http://www.rabbitmq.com/        AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(

Rabbit MQ 入门指南

rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. RabbitMQ的官方站:http://www.rabbitmq.com/        AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息.AMQP的原始用途只是为金融界提供一个可以彼

Rabbit MQ和Spring Boot的整合

消息服务 背景:有时需与其它系统集成来完成相关业务功能,原始的做法是程序内部相互调用,除此之外,还可用消息服务中间件来进行业务处理,使用消息服务中间件处理业务能够提升系统的异步通信和扩展解耦的能力,个人有点面向切面的意思. 一.为什么要使用消息服务? 因为它有很多好处,能解决很多问题: 1.异步处理 2.流量消峰 3.提高效率和可靠性 二.RabbitMQ消息中间件的原理和工作模式 RabbitMQ消息中间件的原理: 1.消息发布者P向RabbitMQ代理(Broker)指定虚拟主机服务器发送消

JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息

ActiveMQ是一个消息中间件,对于消费者而言有两种方式从消息中间件获取消息: ①Push方式:由消息中间件主动地将消息推送给消费者:②Pull方式:由消费者主动向消息中间件拉取消息.看一段官网对Push方式的解释: To be able to achieve high performance it is important to stream messages to consumers as fast as possible so that the consumer always has a

Spring Boot:使用Rabbit MQ消息队列

综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以从消息队列中读走消息,而消息队列就是在消息的传输过程中保存消息的容器,你可以简单的把消息队列理解为类似快递柜,快递员(消息发布者)往快递柜(消息队列)投递物件(消息),接受者(消息订阅者)从快递柜(消息队列)接收物件(消息),当然消息队列往往还包含一些特定的消息传递和接收机制. 消息队列作为分布式系

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599 rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析. 编程模型伪代码如下: ConnectionFactory

消息推送 - 微信公众平台订阅用户获取最佳途径

微信公众平台统计功能中用户增长页增加增长来源统计后,我们可以知道用户获取途径大致有①搜索公众号名称②搜索微信号③图文消息右上角菜单④名片分享⑤其他,老贼也在第一时间为我们总结了目前微信公众平台用户获取最佳途径还是通过消息推送,另一个就是互推. 首先是正常运营状态,即指不做任何推广,单纯靠消息推送获取新用户,在这种条件下获取用户最多的途径竟然是……其他!而在这个里面据我观察主要是通过图文消息标题下蓝字关注进来的,也就是说一方面用户通过这种方式点击关注公众号的习惯已经养成,另一方面很多公众账号在头图

RABBIT MQ 消息队列的工作模式

1.RABBIT MQ的工作模式https://blog.csdn.net/fysuccess/article/details/70265889 2.ACTIVE MQ的订阅模式3.https://blog.csdn.net/qq_26504875/article/details/51802316 原文地址:http://blog.51cto.com/a1liujin/2094130

rabbit mq 基础流程(转)

从AMQP协议可以看出,MessageQueue.Exchange和Binding构成了AMQP协议的核心,下面我们就围绕这三个主要组件    从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程中的注意事项. 1. 声明MessageQueue 在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue.这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确: a)消费者是无法订阅或者获取不存在的Me