rabbitMQ的简单实例——amqp协议带数据回写机制

rabbitMQ是一种高性能的消息队列,支持或者说它实现了AMQP协议(advanced message queue protocol高级消息队列协议)。

下面简单讲一讲一个小例子。我们首先要部署好rabbitMQ,然后实现一个生产者—消费者,生产者向rabbit中发布一个消息,消费者去rabbit取这个消息,在正确收到这个消息后,消费者会通过返回队列回写通知生产者自己收到了消息。

windows下部署rabbit非常简单,先安装erlang运行时,然后安装rabbitMQ安装文件即可,都是exe的,很简单。然后找到rabbit的sbin目录里的bat即可启动rabbitMQ。

下面是producer—consumer代码:

package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;

public class Producer {
    private final static String QUEUE_NAME = "myQueue"; //上送队列

    public static void main(String[] args) throws IOException, TimeoutException{
        String replyQueueName = null;   //返回队列名

        ConnectionFactory connFactory = null;
        Connection conn = null;
        Channel channel = null;
        try{
        connFactory = new ConnectionFactory();
        connFactory.setHost("localhost");
        conn = connFactory.newConnection();
        channel = conn.createChannel();
        //返回队列
        replyQueueName = channel.queueDeclare().getQueue();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);

        String corrId = java.util.UUID.randomUUID().toString(); //用来表示返回队列结果的id,唯一
        BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
        String msg = "[email protected]";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicPublish("", QUEUE_NAME, props, msg.getBytes());
        System.out.println("producer has published: \"" + msg + "\"");

        while(true){
            Thread.sleep(1000);
            Delivery delivery = consumer.nextDelivery();
            System.out.println("from server reply:" + new String(delivery.getBody()));
        }
        }catch(IOException ioe){
            ioe.printStackTrace();
        }catch(TimeoutException toe){
            toe.printStackTrace();
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            if(channel!=null)   channel.close();
            if(conn!=null)  conn.close();
        }
    }
}
package com.hzfi.rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {
    private final static String QUEUE_NAME = "myQueue";
    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connFactory = null;
        Connection conn = null;
        Channel channel = null;
        try{
        connFactory = new ConnectionFactory();
        connFactory.setHost("localhost");
        conn = connFactory.newConnection();
        channel = conn.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("listening for event message...");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while(true){
            Thread.sleep(1000);
            Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties reply_props = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
            String msg = new String(delivery.getBody(),"utf-8");
            System.out.println("receive msg:" + msg);
            String retMsg = "ok, give you reply:" + new String(msg.getBytes(),"utf-8");
            System.out.println("Consumer中的返回队列名" + props.getReplyTo());
            channel.basicPublish( "", props.getReplyTo(), reply_props, retMsg.getBytes());
        }
        }catch(IOException ioe){
            ioe.printStackTrace();
        }catch(TimeoutException toe){
            toe.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            if(channel!=null)   channel.close();
            if(conn!=null)  conn.close();
        }
    }
}
时间: 2024-10-20 12:17:48

rabbitMQ的简单实例——amqp协议带数据回写机制的相关文章

嵌套JSON数据自动回写HTML网页

本文介绍解析来自MongoDB数据库的JSON嵌套字符串,按HTML界面元素自定义属性展现数据内容的解决方案及技术实现代码. HTML网页定义 <html> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <head></head> <body> <h1>表单展示数据回写测试 JavaScript</h

Paxos协议超级详细解释+简单实例

转载自:  https://blog.csdn.net/cnh294141800/article/details/53768464 Paxos协议超级详细解释+简单实例 Basic-Paxos算法(可以先看后面的实际例子再看前面的具体介绍部分) Paxos算法的目的 Paxos算法的目的是为了解决分布式环境下一致性的问题. 多个节点并发操纵数据,如何保证在读写过程中数据的一致性,并且解决方案要能适应分布式环境下的不可靠性(系统如何就一个值达到统一) Paxos的两个组件 Proposer 提议发

AMQP协议与RabbitMQ

什么是AMQP? 在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式. AMQP 中包含的主要元素 生产者(Producer):向Exchange发布消息的应用. 消费者(Consumer):从消息队列queue中消费消息的应用. 消息队列

原生ajax调用数据简单实例代码

原生ajax调用数据简单实例代码:由于jQuery的盛行,现在使用较多的是jQuery封装好了的ajax,因为解决了浏览器兼容性问题,这对程序员来说就等于去掉了一个心头大患,但并非原生ajax就销声匿迹,并且本人感觉还是对原生的ajax有所了解的好,下面就是一段ajax数据调用的实例代码,非常的简单,初学者可以参考一下.代码如下:一.兼容浏览器部分: function xmlHttpR() { var xmlhttp; if(window.XMLHttpRequest) { xmlhttp=ne

RabbitMQ与AMQP协议详解

1. 消息队列的历史 了解一件事情的来龙去脉,将不会对它感到神秘.让我们来看看消息队列(Message Queue)这项技术的发展历史. Message Queue的需求由来已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB). TIB被电信和通讯公司采用,路透社收购了Teknekron公司.之后,IBM开发了MQSeries,微软开发了Microsoft Message Que

一:AMQP协议标准简单介绍

一:AMQP协议?--->AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.和前面罗列的技术不同,AMQP 是一个标准化的消息中间件协议--->她的理想是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口.这样,人们就可以采用各种语言和平台来实现自己的应用,当需要和其他系统通信时,只要承认 AMQP 协议即可. 二:AMQP协议的形象化例子?--->世界各地的人们由于地理和历史的原因,使用着各种不同的语言,相互交流十

RabbitMQ之AMQP协议

一. 协议 AMQP协议是分层的,类似于OSI或TCP/IP协议分层.从图中可以看出分三层: 1. Model Layer:规范服务器端Broker的行为. 2. Session Layer:定义客户端 与服务器端Broker的Context. 3. Transport Layer:传输二进制数据流. 二. 模型 AMQP服务器Broker主要由Exchange和Message Queue组成,主要功能是Message的路由Routing和缓存Buffering. Exchange接受Produ

【MySQL】存储过程、游标、循环简单实例

有时候仅凭 sql 语句可能达不到想要的数据操作目的,有可能需要写一些方法体,通过循环判断等操作最终达到目的.那么在数据库里实现这种方法体就需要存储过程了,个人觉得一个带注释的简单实例可以简单粗暴地解决大部分问题,当然要深入学习了解的话还是要看教程文档了,话不多说,上码: [sql] view plain copy create procedure my_procedure() -- 创建存储过程 begin -- 开始存储过程 declare my_id varchar(32); -- 自定义

jQuery Datatable 实用简单实例

目标: 使用jQuery Datatable构造数据列表,并且增加或者隐藏相应的列,已达到数据显示要求.同时,jQuery Datatable强大的功能支持:排序,分页,搜索等. Query Datatable能良好支持数据完全加载到本地后构建数据列表,排序.分页.搜索等功能就会自带,不需要我们去关心,在此主要说明通过后台动态的加载数据,已达到在大数据面前提高效率的效果. 1. 通过后台进行分页 2. 通过后台进行排序 3. 通过后台进行搜索 具体使用方法: 1. 首先构建我们需要的数据列表,以