RabbitMQ学习(六):远程结果调用

场景:我们需要在传输消息时得到结果

客服端在发送请求时会发送回调队列,服务端处理事情完成后会将结果返回到回调队列中,在增加关联标志关联每个请求和服务返回

客户端代码:


public class RPCClient {

private final static String RPC_Queue_Name
= "rpc_queue";

public static void main(String[]
args) throws
IOException,
TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection =
factory.newConnection();

Channel channel =
connection.createChannel();

//声明队列

channel.queueDeclare(RPC_Queue_Name, false, false, false, null);

//为每一个客户端获取一个随机的回调队列

String
replyQueueName = channel.queueDeclare().getQueue();

//为每一个客户端创建一个消费者(用于监听回调队列,获取结果)

QueueingConsumer
consumer = new QueueingConsumer(channel);

//消费者与队列关联

channel.basicConsume(replyQueueName,
true, consumer);

String response = null;

String corrId = java.util.UUID.randomUUID().toString();

//设置replyTocorrelationId属性值

AMQP.BasicProperties
props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

//发送消息到rpc_queue队列

channel.basicPublish("", RPC_Queue_Name, props, "8".getBytes());

while (true) {

QueueingConsumer.Delivery
delivery = consumer.nextDelivery();

if (delivery.getProperties().getCorrelationId().equals(corrId))
{

response = new String(delivery.getBody(),"UTF-8");

break;

}

}

System.out.println( "fib(8) is
"
+ response);

}

}

服务端代码:

public class RPCServer {
    private final static String RPC_Queue_Name = "rpc_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();      channel.queueDeclare(RPC_Queue_Name,false,false,false,null);
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_Queue_Name, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            //获取请求中的correlationId属性值,并将其设置到结果消息的correlationId属性中
            BasicProperties props = delivery.getProperties();
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
            //获取回调队列名字
            String callQueueName = props.getReplyTo();

            String message = new String(delivery.getBody(),"UTF-8");

            System.out.println(" [.] fib(" + message + ")");

            //获取结果
            String response = "" + fib(Integer.parseInt(message));
            //先发送回调结果
            channel.basicPublish("", callQueueName, replyProps,response.getBytes());
            //后手动发送消息反馈
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    private static int fib(int i)
    {
        if(i==0) return 0;
        if (i==1) return 1;
        return fib(i-1) +fib(i-2);
    }
}
时间: 2024-11-05 12:20:16

RabbitMQ学习(六):远程结果调用的相关文章

Android学习之远程绑定调用service

http://blog.csdn.net/q1234456gggg_jkjg/article/details/8479070 远程绑定调用service主要是用来不同进程的信息共享.就比如服务器和客户端,在服务器端设置好一个service提供方法或信息,然后客户端可以直 接调用服务器端service提供方法或信息.这里有个前提是客户端必须有和服务器端一份一样的AIDL,然后服务器端在客户端使用的系统上有注册过(也 就是安装运行过一次),之后客户端就可以远程绑定调用服务器端的service了. 具

RabbitMQ九:远程过程调用RPC

定义 RPC(Remote Procedure Call Protocol)--远程过程调用协议:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务

RabbitMQ学习之:(六)Direct Exchange (转贴+我的评论)

From: http://lostechies.com/derekgreer/2012/04/02/rabbitmq-for-windows-direct-exchanges/ RabbitMQ for Windows: Direct Exchanges Posted by Derek Greer on April 2, 2012 This is the fifth installment to the series: RabbitMQ for Windows.  In thelast inst

RabbitMQ(六)远程连接

RabbitMQ(六)远程连接 默认情况下,rabbitmq使用`guest`来连接本地(localhost)的server,当需要远程连接时,就会失效. "guest" user can only connect via localhost 官方文档:http://www.rabbitmq.com/access-control.html 如果必须使用`guest`用户来进行远程登录,需要修改配置 [{rabbit, [{loopback_users, []}]}]. (1)那么首先需

dubbo_远程同步调用原理

Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况. Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互. 连接个数:单连接 连接方式:长连接 传输协议:TCP 传输方式:NIO异步传输 序列化:Hessian二进制序列化 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串

RabbitMQ(六) ——RPC

RabbitMQ(六) --RPC (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况.通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息. RPC模式如下图所示: RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢.而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢.因此,不能滥用rpc模式. 二.回调队列(Call

Rabbitmq学习(一) Rabbitmq初探

Rabbitmq学习(一) Rabbitmq初探 理论 定义 消息队列:在消息的传输过程中保存消息的的容器. 这是一个较为经典的消费-生产者模型,说起来比较抽象,打个比方:A线程需要给B线程发送消息(A.B线程不一定是在同一台机器上的),A线程先把消息发送到消息队列服务器上,然后B线程去读取或是订阅消息服务器上消息队列中的消息,线程A和B之间并没有进行直接通信.MQ服务器在中间起到中继的作用. 适用的应用场景 比较适合异步传输,这里解释一下什么是异步和同步. 异步:发送方不关心消息有没有发送成功

C#多线程学习(六) 互斥对象

C#多线程学习(六) 互斥对象 如何控制好多个线程相互之间的联系,不产生冲突和重复,这需要用到互斥对象,即:System.Threading 命名空间中的 Mutex 类. 我们可以把Mutex看作一个出租车,乘客看作线程.乘客首先等车,然后上车,最后下车.当一个乘客在车上时,其他乘客就只有等他下车以后才可以上车.而线程与Mutex对象的关系也正是如此,线程使用Mutex.WaitOne()方法等待Mutex对象被释放,如果它等待的Mutex对象被释放了,它就自动拥有这个对象,直到它调用Mute

JBPM学习(六):详解流程图

概念: 流程图的组成: a. 活动 Activity / 节点 Node b. 流转 Transition / 连线(单向箭头) c. 事件 1.流转(Transition) a) 一般情况一个活动中可以指定一个或多个Transition i. 开始活动(Start)中只能有一个Transition. ii. 结束活动(End)中没有Transition. iii. 其他活动中有一条或多条Transition b) 如果Transition只有一个,则可以不指定名称(名称是null):如果有多个

Cmdlet开发与学习(六)

之前的内容主要是关于cmdlet开发的,下面要将的内容,是关于在应用程序中集成PowerShell引擎. 运行空间和管道       Runspace类是PowerShell引擎API的重要组成部分,Runspace实例代表一个PowerShell执行引擎实例,其中包含自己的一系列变量,驱动器映射,函数等.这些资源统称为运行空间的"会话状态". 创建并调用Pipeline类的实例,我们就可以在运行空间中使用命令行.Pipeline类实例代表PowerShell命令行对象,其中包含各种命