rabbitMQ学习笔记(七) RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务。

其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是

客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

示例:

 1 package com.zf.rabbitmq07;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.QueueingConsumer.Delivery;
12 import com.rabbitmq.client.ShutdownSignalException;
13
14 public class RPCServer {
15
16     public static final String RPC_QUEUE_NAME = "rpc_queue";
17
18     public static String sayHello(String name){
19         return "hello " + name ;
20     }
21
22     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
23
24         ConnectionFactory connFac = new ConnectionFactory() ;
25         connFac.setHost("localhost");
26
27         Connection conn = connFac.newConnection() ;
28
29         Channel channel = conn.createChannel() ;
30
31         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
32
33         QueueingConsumer consumer = new QueueingConsumer(channel);
34
35         channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
36
37         while(true){
38             System.out.println("服务端等待接收消息..");
39             Delivery deliver = consumer.nextDelivery() ;
40             System.out.println("服务端成功收到消息..");
41             BasicProperties props =  deliver.getProperties() ;
42
43             String message = new String(deliver.getBody() , "UTF-8") ;
44
45             String responseMessage = sayHello(message) ;
46
47             BasicProperties responseProps = new BasicProperties.Builder()
48             .correlationId(props.getCorrelationId())
49             .build() ;
50
51             //将结果返回到客户端Queue
52             channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
53
54             //向客户端确认消息
55             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
56             System.out.println("服务端返回消息完成..");
57         }
58
59     }
60
61 }
 1 package com.zf.rabbitmq07;
 2
 3 import java.io.IOException;
 4 import java.util.UUID;
 5
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 import com.rabbitmq.client.QueueingConsumer.Delivery;
13 import com.rabbitmq.client.ShutdownSignalException;
14
15 public class RPCClient {
16
17     public static final String RPC_QUEUE_NAME = "rpc_queue";
18
19     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
20
21         ConnectionFactory connFac = new ConnectionFactory() ;
22         connFac.setHost("localhost");
23         Connection conn = connFac.newConnection() ;
24         Channel channel = conn.createChannel() ;
25
26         //响应QueueName ,服务端将会把要返回的信息发送到该Queue
27         String responseQueue = channel.queueDeclare().getQueue() ;
28
29         String correlationId = UUID.randomUUID().toString() ;
30
31         BasicProperties props = new BasicProperties.Builder()
32         .replyTo(responseQueue)
33         .correlationId(correlationId)
34         .build();
35
36         String message = "is_zhoufeng";
37         channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
38
39         QueueingConsumer consumer = new QueueingConsumer(channel)    ;
40
41         channel.basicConsume( responseQueue , consumer) ;
42
43         while(true){
44
45             Delivery delivery = consumer.nextDelivery() ;
46
47             if(delivery.getProperties().getCorrelationId().equals(correlationId)){
48                 String result = new String(delivery.getBody()) ;
49                 System.out.println(result);
50             }
51
52         }
53     }
54
55 }
时间: 2024-10-17 22:29:34

rabbitMQ学习笔记(七) RPC 远程过程调用的相关文章

Java学习笔记之RMI远程方法调用

RMI 应用通常有两个分开的程序组成,一个服务端程序和一个客户端程序.一个典型的服务端程序创建一些远程对象,使得对这些远程对象的引用可以被访问,等待客户端调用这些远程对象提供的方法.一个典型的客户端程序获取远程引用,指向一个或者多个服务端上的远程对象,然后调用这些远程对象所提供的方法.通常我们称这为分布式对象应用程序. 3.1 RMI的工作方式 分布式对象应用程序需要做的事情: l 查找(定位)远程对象. 应用程序可以使用各种不同的机制取得远程对象的引用.比如应用程序可以通过 RMI 提供的简单

Lua学习笔记(七):迭代器与泛型for

1.迭代器与闭包 迭代器是一种支持指针类型的结构,它可以遍历集合的每一个元素.在Lua中我们常常使用函数来描述迭代器,每次调用该函数就返回集合的下一个元素. 迭代器需要保留上一次成功调用的状态和下一次成功调用的状态,也就是他知道来自于哪里和将要前往哪里.闭包提供的机制可以很容易实现这个任务.记住:闭包是一个内部函数,它可以访问一个或者多个外部函数的外部局部变量.每次闭包的成功调用后这些外部局部变量都保存他们的值(状态).当然如果要创建一个闭包必须要创建其外部局部变量.所以一个典型的闭包的结构包含

Swift学习笔记七:闭包

闭包可以 捕获 和存储其所在上下文中任意常量和变量的引用. Swift 会为您管理在 捕获 过程中涉及到的内存操作. 在 函数 章节中介绍的全局和嵌套函数实际上也是特殊的闭包,闭包采取如下三种形式之一: 1. 全局函数是一个有名字但不会捕获任何值的闭包 2. 嵌套函数是一个有名字并可以捕获其封闭函数域内值的闭包 3. 闭包表达式是一个可以捕获其上下文中变量或常量值的没有名字的闭包 一.闭包表达式 闭包函数类似于Objective-C中的block.下面我们用事实说话: let counts =

Linux System Programming 学习笔记(七) 线程

1. Threading is the creation and management of multiple units of execution within a single process 二进制文件是驻留在存储介质上,已被编译成操作系统可以使用,准备执行但没有正运行的休眠程序 进程是操作系统对 正在执行中的二进制文件的抽象:已加载的二进制.虚拟内存.内核资源 线程是进程内的执行单元 processes are running binaries, threads are the smal

马哥学习笔记七——LAMP编译安装之MYSQL

1.准备数据存放的文件系统 新建一个逻辑卷,并将其挂载至特定目录即可.这里不再给出过程. 这里假设其逻辑卷的挂载目录为/mydata,而后需要创建/mydata/data目录做为mysql数据的存放目录. 2.新建用户以安全方式运行进程: # groupadd -r mysql # useradd -g mysql -r -s /sbin/nologin -M -d /mydata/data mysql # chown -R mysql:mysql /mydata/data 3.安装并初始化my

学习笔记之卸载远程目标进程中的DLL模块(转)

学习笔记之卸载远程目标进程中的DLL模块 (2007-07-23 23:51:02) 转载▼ 学习笔记之卸载远程目标进程中的DLL模块2007/7/231.首先得把DLL模块中的线程结束使用CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD,0);创建系统线程的快照然后用Thread32First()和Thread32Next()遍历系统中所有线程.将遍历到的线程保存到THREADENTRY32结构,然后判断结构中的th32OwnerProcessID成员是否与

swift学习笔记(七)自动引用计数

与Object-c一样,swift使用自动引用计数来跟踪并管理应用使用的内存.当实例不再被使用时,及retainCount=0时,会自动释放是理所占用的内存空间. 注:引用计数仅适用于类的实例,因为struct和enumeration属于值类型,也就不牵涉引用,所以其存储和管理方式并不是引用计数. 当一个实例被初始化时,系统会自动分配一定的内存空间,用于管理属性和方法.当实例对象不再被使用时,其内存空间被收回. swift中的引用类型分为三种,即Strong强引用,weak弱引用和无主引用unw

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

oracle学习笔记 SQL语句执行过程剖析讲课

oracle学习笔记 SQL语句执行过程剖析讲课 这节课通过讲述一条SQL语句进入数据库 和其在数据库中的整个的执行过程 把数据库里面的体系结构串一下. 让大家再进一步了解oracle数据库里面的各个进程.存储结构以及内存结构的关联关系. 首先来讲整个体系中有客户端.实例和数据库 数据库里有三类文件 控制文件ctl.数据文件dbf.日志文件log 实例中SGA有六大池子 第一大内存区shared pool即共享池 第二大内存区buffer cache 第三块是redo log 我们主要讲上面的三