spring XD结合RabbitMQ方法介绍

1,Rabbitmq工作流程

消息生产者(即下图中的ClientA,ClientB)生产message并交给交换机Exchange,Exchange将消息按照RoutingKey分发到相应的队列Queue中,而我们的消费者(即下图中的Client1,Client2,Client3)需要做的事情就是监听队列Queue的事件,当有新的message到达的时候,做出相应的处理。

Rabbitmq结构图;

2,Exchange交换类型介绍

具体来说,Exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为“abc”,那么客户端提交的消息,只有设置了key为“abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号“#”匹配一个或多个词,符号“*”匹配正好一个词。例如“abc.#”匹配“abc.def.ghi”,“abc.*”只匹配“abc.def”。还有一种不需要key的,叫做“fanout”交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列;我们应该使用的就是“fanout”交换机。

3,“注册”新消费者的流程

在我们“注册”新的消费者的时候,首先要做的事情就是明确连接的host地址(ip)和Exchange的name,然后声明queue的时候将其绑定到对应name的Exchange上即可。

4,spring XD与Rabbitmq的结合方法

在xd-shell中执行stream create --name RABBITMQ_NAME --definition "http | rabbit --exchange= EXCHANGE_NAME" –deploy

其中,替换RABBITMQ_NAME为rabbitmq的名字,EXCHANGE_NAME替换为Exchange的名字,并与消费者绑定的Exchange的名字保持一致;

5,测试方法:

curl -X POST -d ‘helloworld!!!‘ http://host_ip:host_port

将host_ip和host_post设置为Rabbitmq所在主机ip和端口,启动消费者程序,如果能够收到message “helloworld!!!”则可证明成功!

6,消费者示例代码:

package main.java.rabbitmq;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

public class Recv {

private static final String EXCHANGE_NAME = "bot";

public static void main(String[] argv) throws Exception {

//创建连接连接到Rabbitmq

ConnectionFactory factory = new ConnectionFactory();

//设置Rabbitmq所在主机ip或者主机名

factory.setHost("localhost");

//创建一个连接;

Connection connection = factory.newConnection();

//创建一个频道

Channel channel = connection.createChannel();

//频道的exchange声明

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

//获取队列名称

String queueName = channel.queueDeclare().getQueue();

System.out.println("queueName [" + queueName + "]");

//绑定队列与exchange

channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

//配置好获取消息的方式

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

while(true){

//获取消息,如果没有消息,这一步将会一直阻塞

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println("Received [" + message + "]"); }

}

}

时间: 2024-08-13 06:19:48

spring XD结合RabbitMQ方法介绍的相关文章

简单介绍下怎么在spring中使用RabbitMQ

这篇文章主要介绍了简单了解如何在spring中使用RabbitMQ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. (2)RabbitMQ AMQP协议的领导实现,支持多种场景.淘宝的MySQL集群内部有使用它进行通讯,Open

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列 并增加了自己的一些理解,记录下来,以便日后查阅. 项目源码: spring-boot-rabbitmq-delay-queue 实现 stream-rabbitmq-delay-queue 实现 背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟队列能做什么?延迟队列多用于需要延迟工作的场景.最常见的是以下两种场景: 延迟消费

Spring XD 1.1 M2 and 1.0.3 released---support kafka

官方地址:http://spring.io/blog/2014/12/23/spring-xd-1-1-m2-and-1-0-3-released On behalf of the Spring XD team, I am very pleased to announce the second milestone release of Spring XD 1.1 and the 1.0.3 maintenance release. Download Links:* 1.1.0.M2 RELEAS

Spring官方文档翻译——15.1 介绍Spring Web MVC框架

Part V. The Web 文档的这一部分介绍了Spring框架对展现层的支持(尤其是基于web的展现层) Spring拥有自己的web框架--Spring Web MVC,在前两章中会有介绍.剩下的章节则用来介绍Spring和其他web技术的集成,比如Struts和JSF(这里只提两个). 本节以对Spring MVC portlet框架的介绍结尾. 第十五章--Web MVC框架(Web MVC framework) 第十六章--视图技术(View technologie) 第十七章--

Spring 2.0 的AOP介绍及其通知类型

Spring 2.0的AOP 在Spring 2.0中最激动人心的增强之一是关于Spring AOP,它变得更加便于使用而且更加强大,主要是通过复杂而成熟的AspectJ语言的支持功能来实现,而同时保留纯的基于代理的Java运行时.Spring 2.0的AOP提供给我们一种新的思考程序结构的方法,能够解决很多纯OOP无法解决的问题--让我们能够在一个模块中实现某些需求,而不是以发散的方式实现.Spring 2.0允许用户选择使用基于模式或@AspectJ注解的方式来自定义切面.这两种风格都支持所

Spring XD简介:大数据应用的运行时环境

简介 Spring XD(eXtreme Data,极限数据)是Pivotal的大数据产品.它结合了Spring Boot和Grails,组成Spring IO平台的执行部分.尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是一个类库或者框架,它包含带有服务器的bin目录,你可以通过命令行启动并与之交互.运行时可以运行在开发机上.客户端自己的服务器上.AWS EC2上或者Cloud Foundry上. Spring XD中的关键组件是管理和容器服务器(Admin

Spring Boot之RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式

Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言 RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷.消息分发的作用. 消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的.在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ . 当然,我们本篇文章的主角还是 Ra

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持 一丶前言 在生产中,存在一些场景,需要对数据进行批量操作.如,可以先将数据存放到redis,然后将数据进行批量写进数据库.但是使用redis,不得不面对一个数据容易丢失的问题.也可以考虑使用消息队列进行替换,在数据持久化,数据不丢失方面,消息队列确实比redis好一点,毕竟设计不一样.是不是使用消息队列,就一定好呢?不是的,首先使用消息队列,不能确保数据百分百不丢失,(如果要做到百分百不丢失,设计上就会比较复杂),除此之