Thrift全面介绍

官网:http://thrift.apache.org

简介

Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。

Thrift最初由facebook开发用做系统内各语言之间的RPC通信 。

2007年由facebook贡献到apache基金 ,08年5月进入apache孵化器 。

支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务 ,跨越语言的C/S RPC调用 。

Thrift允许定义一个简单的定义文件中的数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

一个小实例

以UserService为例,描述一下使用thrift的方式,以及其原理。

service.thrift

  1. struct User {
  2. 1:i64 id,
  3. 2:string name,
  4. 3:i64 timestamp,
  5. 4:bool vip
  6. }
  7. service UserService {
  8. User getById(1:i64 id)
  9. }

你可以将自己的Java服务通过".thrift"文件描述出来,并提供给服务消费端,那么消费端即可以生成自己的API文件。Thrift框架目前已经支持大部分主流的语言。需要注意,因为Thrift考虑到struct/service定义需要兼容多种语言的”风格",所以它只支持一些基本的数据类型(比如i32,i64,string等),以及service定义的方法不能重名,即使参数列表不同(并不是所有的语言都能像JAVA一样支持重载)。

生成API文件

 

首先下载和安装thrift客户端,比如在windows平台下,下载thrift.exe。不过此处需要提醒,不同的thrift客户端版本生成的API可能不兼容。本例使用thrift-0.9.0.exe,通过"--gen"指定生成API所适配的语言。本实例为生成java客户端API。

Java代码  

  1. //windows平台下,将API文件输出在service目录下(此目录需要存在)
  2. > thrift.exe --gen java -o service service.thrift

需要明确的是:Thrift和其他RPC框架不同,thrift在生成的API文件中,已经描述了"调用过程"(即硬编码),而不是像其他RPC那样在运行时(runtime)动态解析方法调用或者参数。

UserService实现类

Java代码  

  1. public class UserServiceImpl implements UserService.Iface {
  2. @Override
  3. public User getById(long id){
  4. System.out.println("invoke...id:" + id);
  5. return new User();//for test
  6. }
  7. }

实现类,需要放在Thrift server端。

 

原理简析

 

User.java: thrift生成API的能力还是非常的有限,比如在struct中只能使用简单的数据类型(不支持Date,Collection<?>等),不过我们能从User中看出,它生成的类实现了"Serializable"接口和"TBase”接口。其中Serializable接口表明这个类的实例是需要序列化之后在网络中传输的,为了不干扰Java本身的序列化和反序列化机制,它还重写了readObject和writeObject方法,不过这对thrift本身并没有帮助。

TBase接口是thrift序列化和反序列化时使用的,它的两个核心方法:read和write。在上述的thrift文件中,struct定义的每个属性都有一个序号,比如1:id,那么thrift在序列化时,将会根据序号的顺序依次将属性的"名称 + 值"写入inputStream中,反序列化也是如此。(具体参见read和write的实现)。

Java代码  

  1. //read方法逐个读取字段,按照"索引",最终将"struct"对象封装完毕.
  2. //write方法也非常类似,按照"索引"顺序逐个输出到流中.
  3. while (true){
  4. schemeField = iprot.readFieldBegin();
  5. if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
  6. break;
  7. }
  8. switch (schemeField.id) {
  9. case 1: // ID
  10. if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
  11. struct.id = iprot.readI32();
  12. struct.setIdIsSet(true);
  13. } else {
  14. org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
  15. }
  16. break;
  17. case 2: // NAME
  18. ..
  19. }
  20. }

因为thrift的序列化和反序列化实例数据时,是根据"属性序号"进行,这可以保证数据在inputstream和outputstream中顺序是严格的,此外每个struct中"序号"不能重复,但是可以不需要从"1"开始.如果"序号"有重复,将导致无法生成API文件.这一点也要求API开发者,如果更改了thrift文件中的struct定义,需要重新生成客户端API,否则服务将无法继续使用(可能报错,也可能数据错误).thrift序列化/反序列化的过程和JAVA自带的序列化机制不同,它将不会携带额外的class结构,此外thrift这种序列化机制更加适合网络传输,而且性能更加高效.

UserService.Client:  在生成的UserService中,有个Client静态类,这个类就是一个典型的代理类,此类已经实现了UserService的所有方法。开发者需要使用Client类中的API方法与Thrift server端交互,它将负责与Thrift server的Socket链接中,发送请求和接收响应。

需要注意的时,每次Client方法调用,都会在一个Socket链接中进行。这就意味着,在使用Client消费服务之前,需要和Thrift server建立有效的TCP链接。(稍后代码示例)

 

1) 发送请求:

Java代码  

  1. //参见:TServiceClient
  2. //API方法调用时,发送请求数据流
  3. protected void sendBase(String methodName, TBase args) throws TException {
  4. oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先写入"方法名称"和"seqid_"
  5. args.write(oprot_);//序列化参数
  6. oprot_.writeMessageEnd();
  7. oprot_.getTransport().flush();
  8. }
  9. protected void receiveBase(TBase result, String methodName) throws TException {
  10. TMessage msg = iprot_.readMessageBegin();//如果执行有异常
  11. if (msg.type == TMessageType.EXCEPTION) {
  12. TApplicationException x = TApplicationException.read(iprot_);
  13. iprot_.readMessageEnd();
  14. throw x;
  15. }//检测seqid是否一致
  16. if (msg.seqid != seqid_) {
  17. throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
  18. }
  19. result.read(iprot_);//反序列化
  20. iprot_.readMessageEnd();
  21. }

Thrift提供了简单的容错方式:每次方法调用,都会在Client端标记一个seqid,这是一个自增的本地ID,在TCP请求时将此seqid追加到流中,同时Server端响应时,也将此seqid原样返回过来;这样客户端就可以根据此值用来判断"请求--响应"是对应的,如果出现乱序,将会导致此请求以异常的方式结束。

2) 响应

Java代码  

  1. //参考: TBaseProcessor.java
  2. @Override
  3. public boolean process(TProtocol in, TProtocol out) throws TException {
  4. TMessage msg = in.readMessageBegin();
  5. ProcessFunction fn = processMap.get(msg.name);//根据方法名,查找"内部类"
  6. if (fn == null) {
  7. TProtocolUtil.skip(in, TType.STRUCT);
  8. in.readMessageEnd();
  9. TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: ‘"+msg.name+"‘");
  10. out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
  11. x.write(out);//序列化响应结果,直接输出
  12. out.writeMessageEnd();
  13. out.getTransport().flush();
  14. return true;
  15. }
  16. fn.process(msg.seqid, in, out, iface);
  17. return true;
  18. }

thrift生成的UserService.Processor类,就是server端用来处理请求过程的"代理类";server端从socket中读取请求需要调用的"方法名" +参数列表,并交付给Processor类处理;和其他的RPC调用不同的时,thrift并没有使用类似于"反射机制"的方式来调用方法,而是将UserService的每个方法生成一个"内部类":

Java代码  

  1. public static class getById<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getById_args> {
  2. public getById() {
  3. super("getById");//其中getById为标识符
  4. }
  5. public getById_args getEmptyArgsInstance() {
  6. return new getById_args();
  7. }
  8. protected boolean isOneway() {
  9. return false;
  10. }
  11. //实际处理方法
  12. public getById_result getResult(I iface, getById_args args) throws org.apache.thrift.TException {
  13. getById_result result = new getById_result();
  14. result.success = iface.getById(args.id);
  15. return result;
  16. }
  17. }

这个"内部类",将会在Processor初始化的时候,放入到一个map中,此后即可以通过"方法名"查找,然后调用其"getResult"方法了.

Java代码  

  1. public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
  2. public Processor(I iface) {
  3. super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
  4. }
  5. protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
  6. super(iface, getProcessMap(processMap));
  7. }
  8. private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
  9. //放入map
  10. processMap.put("getById", new getById());
  11. return processMap;
  12. }
  13. ....
  14. }

3) Server端Socket管理和执行策略

Java代码  

  1. TThreadPoolServer
  2. public void serve() {
  3. try {
  4. //启动服务
  5. serverTransport_.listen();
  6. } catch (TTransportException ttx) {
  7. LOGGER.error("Error occurred during listening.", ttx);
  8. return;
  9. }
  10. // Run the preServe event
  11. if (eventHandler_ != null) {
  12. eventHandler_.preServe();
  13. }
  14. stopped_ = false;
  15. setServing(true);
  16. //循环,直到被关闭
  17. while (!stopped_) {
  18. int failureCount = 0;
  19. try {
  20. //accept客户端Socket链接,
  21. //对于每个新链接,将会封装成runnable,并提交给线程或者线程池中运行.
  22. TTransport client = serverTransport_.accept();
  23. WorkerProcess wp = new WorkerProcess(client);
  24. executorService_.execute(wp);
  25. } catch (TTransportException ttx) {
  26. if (!stopped_) {
  27. ++failureCount;
  28. LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
  29. }
  30. }
  31. }
  32. //....
  33. }

Thrift Server端,设计思路也非常的直接。当前Service server启动之后,将会以阻塞的方式侦听Socket链接(代码参考TThreadPoolServer),每建立一个Socket链接,都会将此Socket经过封装之后,放入线程池中,本质上也是一个Socket链接对应一个Worker Thread。这个Thread只会处理此Socket中的所有数据请求,直到Socket关闭。

Java代码  

  1. //参考:WorkerProcess
  2. while (true) {
  3. if (eventHandler != null) {
  4. eventHandler.processContext(connectionContext, inputTransport, outputTransport);
  5. }
  6. if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
  7. break;
  8. }
  9. }

当有Socket链接不是很多的时候,TThreadPoolServer并不会有太大的性能问题,可以通过指定ThreadPool中线程的个数进行简单的调优。如果Socket链接很多,我们只能使用TThreadedSelectorServer来做支撑,TThreadedSelectorServer内部基于NIO模式,具有异步的特性,可以极大的提升server端的并发能力;不过在绝大多数情况下,在thrift中使用"异步"似乎不太容易让人接受,毕竟这意味着Client端需要阻塞,并且在高并发环境中这个阻塞时间是不可控的。但SelecorServer确实可以有效的提升Server的并发能力,而且在一定程度上可以提升吞吐能力,这或许是我们优化Thrift Server比较可靠的方式之一。

Client端代码示例

Java代码  

  1. public class UserServiceClient {
  2. public void startClient() {
  3. TTransport transport;
  4. try {
  5. transport = new TSocket("localhost", 1234);
  6. TProtocol protocol = new TBinaryProtocol(transport);
  7. UserService.Client client = new UserService.Client(protocol);
  8. transport.open();
  9. User user = client.getById(1000);
  10. ////
  11. transport.close();
  12. } catch (TTransportException e) {
  13. e.printStackTrace();
  14. } catch (TException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

Server端代码示例

Java代码  

  1. public class Server {
  2. public void startServer() {
  3. try {
  4. TServerSocket serverTransport = new TServerSocket(1234);
  5. UserService.Processor process = new Processor(new UserServiceImpl());
  6. Factory portFactory = new TBinaryProtocol.Factory(true, true);
  7. Args args = new Args(serverTransport);
  8. args.processor(process);
  9. args.protocolFactory(portFactory);
  10. TServer server = new TThreadPoolServer(args);
  11. server.serve();
  12. } catch (TTransportException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }

到这里,你就会发现,一个service,需要server端启动一个ServerSocket,如果你有很多service,那么你需要让这些service尽可能的分布在不同的物理server上,否则一个物理server上运行太多的ServerSocket进程并不是一件让人愉快的事情.。或者你让几个service整合成一个。

问题总没有想象的那么简单,其实service被拆分的粒度越细,越容易被部署和扩展,对于负载均衡就更加有利。如何让一个service分布式部署,稍后再继续分享。

总结

1) Thrift文件定义struct和service API,此文件可以被其他语言生成API文件或者类文件。

2) 使用Thrift客户端生成API文件。

3) Java服务端(即服务提供端),实现service功能。

4) 服务端将server发布成一个Thrift server:即将service嵌入到一个ServerSocket中。

5) 客户端启动Socket,并和Thrift server建立TCP连接,并使用Client代理类操作远程接口。

 

服务端开发

Thrift服务server端,其实就是一个ServerSocket线程+处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类”的方法,并将执行结果(或者异常)写入到socket中。

一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上。

设计思路:

1) 每个server内部采用threadPool的方式,来提升并发能力.

2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态

3) 通过spring的方式,配置thrift-server服务类.

其中x注册是可选选项

1.pom.xml

 

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-context</artifactId>
  5. <version>3.0.7.RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.zookeeper</groupId>
  9. <artifactId>zookeeper</artifactId>
  10. <version>3.4.5</version>
  11. <!--<exclusions>-->
  12. <!--<exclusion>-->
  13. <!--<groupId>log4j</groupId>-->
  14. <!--<artifactId>log4j</artifactId>-->
  15. <!--</exclusion>-->
  16. <!--</exclusions>-->
  17. </dependency>
  18. <!--
  19. <dependency>
  20. <groupId>com.101tec</groupId>
  21. <artifactId>zkclient</artifactId>
  22. <version>0.4</version>
  23. </dependency>
  24. -->
  25. <dependency>
  26. <groupId>org.apache.thrift</groupId>
  27. <artifactId>libthrift</artifactId>
  28. <version>0.9.1</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.curator</groupId>
  32. <artifactId>curator-recipes</artifactId>
  33. <version>2.3.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>commons-pool</groupId>
  37. <artifactId>commons-pool</artifactId>
  38. <version>1.6</version>
  39. </dependency>
  40. </dependencies>

本实例,使用了apache-curator作为zookeeper客户端.

2. spring-thrift-server.xml

 

  1. <!-- zookeeper -->
  2. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
  3. <property name="connectString" value="127.0.0.1:2181"></property>
  4. <property name="namespace" value="demo/thrift-service"></property>
  5. </bean>
  6. <bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">
  7. <property name="zookeeper" ref="thriftZookeeper"></property>
  8. </bean>
  9. <bean id="userService" class="com.demo.service.UserServiceImpl"/>
  10. <bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">
  11. <property name="service" ref="userService"></property>
  12. <property name="configPath" value="UserServiceImpl"></property>
  13. <property name="port" value="9090"></property>
  14. <property name="addressReporter" ref="sericeAddressReporter"></property>
  15. </bean>

3. ThriftServiceServerFactory.java

 

此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).

究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.

为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.

  1. public class ThriftServiceServerFactory implements InitializingBean {
  2. private Integer port;
  3. private Integer priority = 1;// default
  4. private Object service;// serice实现类
  5. private ThriftServerIpTransfer ipTransfer;
  6. private ThriftServerAddressReporter addressReporter;
  7. private ServerThread serverThread;
  8. private String configPath;
  9. public void setService(Object service) {
  10. this.service = service;
  11. }
  12. public void setPriority(Integer priority) {
  13. this.priority = priority;
  14. }
  15. public void setPort(Integer port) {
  16. this.port = port;
  17. }
  18. public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
  19. this.ipTransfer = ipTransfer;
  20. }
  21. public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
  22. this.addressReporter = addressReporter;
  23. }
  24. public void setConfigPath(String configPath) {
  25. this.configPath = configPath;
  26. }
  27. @Override
  28. public void afterPropertiesSet() throws Exception {
  29. if (ipTransfer == null) {
  30. ipTransfer = new LocalNetworkIpTransfer();
  31. }
  32. String ip = ipTransfer.getIp();
  33. if (ip == null) {
  34. throw new NullPointerException("cant find server ip...");
  35. }
  36. String hostname = ip + ":" + port + ":" + priority;
  37. Class serviceClass = service.getClass();
  38. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  39. Class<?>[] interfaces = serviceClass.getInterfaces();
  40. if (interfaces.length == 0) {
  41. throw new IllegalClassFormatException("service-class should implements Iface");
  42. }
  43. // reflect,load "Processor";
  44. Processor processor = null;
  45. for (Class clazz : interfaces) {
  46. String cname = clazz.getSimpleName();
  47. if (!cname.equals("Iface")) {
  48. continue;
  49. }
  50. String pname = clazz.getEnclosingClass().getName() + "$Processor";
  51. try {
  52. Class pclass = classLoader.loadClass(pname);
  53. if (!pclass.isAssignableFrom(Processor.class)) {
  54. continue;
  55. }
  56. Constructor constructor = pclass.getConstructor(clazz);
  57. processor = (Processor) constructor.newInstance(service);
  58. break;
  59. } catch (Exception e) {
  60. //
  61. }
  62. }
  63. if (processor == null) {
  64. throw new IllegalClassFormatException("service-class should implements Iface");
  65. }
  66. //需要单独的线程,因为serve方法是阻塞的.
  67. serverThread = new ServerThread(processor, port);
  68. serverThread.start();
  69. // report
  70. if (addressReporter != null) {
  71. addressReporter.report(configPath, hostname);
  72. }
  73. }
  74. class ServerThread extends Thread {
  75. private TServer server;
  76. ServerThread(Processor processor, int port) throws Exception {
  77. TServerSocket serverTransport = new TServerSocket(port);
  78. Factory portFactory = new TBinaryProtocol.Factory(true, true);
  79. Args args = new Args(serverTransport);
  80. args.processor(processor);
  81. args.protocolFactory(portFactory);
  82. server = new TThreadPoolServer(args);
  83. }
  84. @Override
  85. public void run(){
  86. try{
  87. server.serve();
  88. }catch(Exception e){
  89. //
  90. }
  91. }
  92. public void stopServer(){
  93. server.stop();
  94. }
  95. }
  96. public void close() {
  97. serverThread.stopServer();
  98. }
  99. }

4. DynamicAddressReporter.java

 

在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.

  1. public class DynamicAddressReporter implements ThriftServerAddressReporter {
  2. private CuratorFramework zookeeper;
  3. public DynamicAddressReporter(){}
  4. public DynamicAddressReporter(CuratorFramework zookeeper){
  5. this.zookeeper = zookeeper;
  6. }
  7. public void setZookeeper(CuratorFramework zookeeper) {
  8. this.zookeeper = zookeeper;
  9. }
  10. @Override
  11. public void report(String service, String address) throws Exception {
  12. if(zookeeper.getState() == CuratorFrameworkState.LATENT){
  13. zookeeper.start();
  14. zookeeper.newNamespaceAwareEnsurePath(service);
  15. }
  16. zookeeper.create()
  17. .creatingParentsIfNeeded()
  18. .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
  19. .forPath(service +"/i_",address.getBytes("utf-8"));
  20. }
  21. public void close(){
  22. zookeeper.close();
  23. }
  24. }

5. 测试类

  1. public class ServiceMain {
  2. /**
  3. * @param args
  4. */
  5. public static void main(String[] args) {
  6. try {
  7. ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");
  8. Thread.sleep(3000000);
  9. } catch (Exception e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. }

客户端开发

 

Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:

1) client端需要知道server端的IP+port,如果是分布式部署,还需要知道所有server的IP+port列表。

2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案。

3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利。

4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip+port,可以使用zookeeper来推送每个service的服务地址。

5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。

1. pom.xml

 

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-context</artifactId>
  5. <version>3.0.7.RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.zookeeper</groupId>
  9. <artifactId>zookeeper</artifactId>
  10. <version>3.4.5</version>
  11. <!--<exclusions>-->
  12. <!--<exclusion>-->
  13. <!--<groupId>log4j</groupId>-->
  14. <!--<artifactId>log4j</artifactId>-->
  15. <!--</exclusion>-->
  16. <!--</exclusions>-->
  17. </dependency>
  18. <!--
  19. <dependency>
  20. <groupId>com.101tec</groupId>
  21. <artifactId>zkclient</artifactId>
  22. <version>0.4</version>
  23. </dependency>
  24. -->
  25. <dependency>
  26. <groupId>org.apache.thrift</groupId>
  27. <artifactId>libthrift</artifactId>
  28. <version>0.9.1</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.curator</groupId>
  32. <artifactId>curator-recipes</artifactId>
  33. <version>2.3.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>commons-pool</groupId>
  37. <artifactId>commons-pool</artifactId>
  38. <version>1.6</version>
  39. </dependency>
  40. </dependencies>

2. spring-thrift-client.xml

其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.

  1. <!-- fixedAddress -->
  2. <!--
  3. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
  4. <property name="service" value="com.demo.service.UserService"></property>
  5. <property name="serverAddress" value="127.0.0.1:9090:2"></property>
  6. <property name="maxActive" value="5"></property>
  7. <property name="idleTime" value="10000"></property>
  8. </bean>
  9. -->
  10. <!-- zookeeper -->
  11. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
  12. <property name="connectString" value="127.0.0.1:2181"></property>
  13. <property name="namespace" value="demo/thrift-service"></property>
  14. </bean>
  15. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
  16. <property name="service" value="com.demo.service.UserService"></property>
  17. <property name="maxActive" value="5"></property>
  18. <property name="idleTime" value="1800000"></property>
  19. <property name="addressProvider">
  20. <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
  21. <property name="configPath" value="UserServiceImpl"></property>
  22. <property name="zookeeper" ref="thriftZookeeper"></property>
  23. </bean>
  24. </property>
  25. </bean>

3. ThriftServiceClientProxyFactory.java

因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".

  1. @SuppressWarnings("rawtypes")
  2. public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {
  3. private String service;
  4. private String serverAddress;
  5. private Integer maxActive = 32;//最大活跃连接数
  6. ////ms,default 3 min,链接空闲时间
  7. //-1,关闭空闲检测
  8. private Integer idleTime = 180000;
  9. private ThriftServerAddressProvider addressProvider;
  10. private Object proxyClient;
  11. public void setMaxActive(Integer maxActive) {
  12. this.maxActive = maxActive;
  13. }
  14. public void setIdleTime(Integer idleTime) {
  15. this.idleTime = idleTime;
  16. }
  17. public void setService(String service) {
  18. this.service = service;
  19. }
  20. public void setServerAddress(String serverAddress) {
  21. this.serverAddress = serverAddress;
  22. }
  23. public void setAddressProvider(ThriftServerAddressProvider addressProvider) {
  24. this.addressProvider = addressProvider;
  25. }
  26. private Class objectClass;
  27. private GenericObjectPool<TServiceClient> pool;
  28. private PoolOperationCallBack callback = new PoolOperationCallBack() {
  29. @Override
  30. public void make(TServiceClient client) {
  31. System.out.println("create");
  32. }
  33. @Override
  34. public void destroy(TServiceClient client) {
  35. System.out.println("destroy");
  36. }
  37. };
  38. @Override
  39. public void afterPropertiesSet() throws Exception {
  40. if(serverAddress != null){
  41. addressProvider = new FixedAddressProvider(serverAddress);
  42. }
  43. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  44. //加载Iface接口
  45. objectClass = classLoader.loadClass(service + "$Iface");
  46. //加载Client.Factory类
  47. Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");
  48. TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
  49. ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);
  50. GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
  51. poolConfig.maxActive = maxActive;
  52. poolConfig.minIdle = 0;
  53. poolConfig.minEvictableIdleTimeMillis = idleTime;
  54. poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;
  55. pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);
  56. proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {
  57. @Override
  58. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  59. //
  60. TServiceClient client = pool.borrowObject();
  61. try{
  62. return method.invoke(client, args);
  63. }catch(Exception e){
  64. throw e;
  65. }finally{
  66. pool.returnObject(client);
  67. }
  68. }
  69. });
  70. }
  71. @Override
  72. public Object getObject() throws Exception {
  73. return proxyClient;
  74. }
  75. @Override
  76. public Class<?> getObjectType() {
  77. return objectClass;
  78. }
  79. @Override
  80. public boolean isSingleton() {
  81. return true;  //To change body of implemented methods use File | Settings | File Templates.
  82. }
  83. public void close(){
  84. if(addressProvider != null){
  85. addressProvider.close();
  86. }
  87. }
  88. }

4. ThriftClientPoolFactory.java

"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.

  1. /**
  2. * 连接池,thrift-client for spring
  3. */
  4. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
  5. private final ThriftServerAddressProvider addressProvider;
  6. private final TServiceClientFactory<TServiceClient> clientFactory;
  7. private PoolOperationCallBack callback;
  8. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
  9. this.addressProvider = addressProvider;
  10. this.clientFactory = clientFactory;
  11. }
  12. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
  13. this.addressProvider = addressProvider;
  14. this.clientFactory = clientFactory;
  15. this.callback = callback;
  16. }
  17. @Override
  18. public TServiceClient makeObject() throws Exception {
  19. InetSocketAddress address = addressProvider.selector();
  20. TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
  21. TProtocol protocol = new TBinaryProtocol(tsocket);
  22. TServiceClient client = this.clientFactory.getClient(protocol);
  23. tsocket.open();
  24. if(callback != null){
  25. try{
  26. callback.make(client);
  27. }catch(Exception e){
  28. //
  29. }
  30. }
  31. return client;
  32. }
  33. public void destroyObject(TServiceClient client) throws Exception {
  34. if(callback != null){
  35. try{
  36. callback.destroy(client);
  37. }catch(Exception e){
  38. //
  39. }
  40. }
  41. TTransport pin = client.getInputProtocol().getTransport();
  42. pin.close();
  43. }
  44. public boolean validateObject(TServiceClient client) {
  45. TTransport pin = client.getInputProtocol().getTransport();
  46. return pin.isOpen();
  47. }
  48. static interface PoolOperationCallBack {
  49. //销毁client之前执行
  50. void destroy(TServiceClient client);
  51. //创建成功是执行
  52. void make(TServiceClient client);
  53. }
  54. }

5. DynamicAddressProvider.java

将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.

  1. /**
  2. * 可以动态获取address地址,方案设计参考
  3. * 1) 可以间歇性的调用一个web-service来获取地址
  4. * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等)
  5. * 3) 可以基于zookeeper-watcher机制,获取最新地址
  6. * <p/>
  7. * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
  8. * 如下实现,仅供参考
  9. */
  10. public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
  11. private String configPath;
  12. private PathChildrenCache cachedPath;
  13. private CuratorFramework zookeeper;
  14. //用来保存当前provider所接触过的地址记录
  15. //当zookeeper集群故障时,可以使用trace中地址,作为"备份"
  16. private Set<String> trace = new HashSet<String>();
  17. private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
  18. private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
  19. private Object lock = new Object();
  20. private static final Integer DEFAULT_PRIORITY = 1;
  21. public void setConfigPath(String configPath) {
  22. this.configPath = configPath;
  23. }
  24. public void setZookeeper(CuratorFramework zookeeper) {
  25. this.zookeeper = zookeeper;
  26. }
  27. @Override
  28. public void afterPropertiesSet() throws Exception {
  29. //如果zk尚未启动,则启动
  30. if(zookeeper.getState() == CuratorFrameworkState.LATENT){
  31. zookeeper.start();
  32. }
  33. buildPathChildrenCache(zookeeper, configPath, true);
  34. cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
  35. }
  36. private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
  37. cachedPath = new PathChildrenCache(client, path, cacheData);
  38. cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
  39. @Override
  40. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  41. PathChildrenCacheEvent.Type eventType = event.getType();
  42. switch (eventType) {
  43. //                    case CONNECTION_RECONNECTED:
  44. //
  45. //                        break;
  46. case CONNECTION_SUSPENDED:
  47. case CONNECTION_LOST:
  48. System.out.println("Connection error,waiting...");
  49. return;
  50. default:
  51. //
  52. }
  53. //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
  54. cachedPath.rebuild();
  55. rebuild();
  56. }
  57. protected void rebuild() throws Exception {
  58. List<ChildData> children = cachedPath.getCurrentData();
  59. if (children == null || children.isEmpty()) {
  60. //有可能所有的thrift server都与zookeeper断开了链接
  61. //但是,有可能,thrift client与thrift server之间的网络是良好的
  62. //因此此处是否需要清空container,是需要多方面考虑的.
  63. container.clear();
  64. System.out.println("thrift server-cluster error....");
  65. return;
  66. }
  67. List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
  68. for (ChildData data : children) {
  69. String address = new String(data.getData(), "utf-8");
  70. current.addAll(transfer(address));
  71. trace.add(address);
  72. }
  73. Collections.shuffle(current);
  74. synchronized (lock) {
  75. container.clear();
  76. container.addAll(current);
  77. inner.clear();
  78. inner.addAll(current);
  79. }
  80. }
  81. });
  82. }
  83. private List<InetSocketAddress> transfer(String address){
  84. String[] hostname = address.split(":");
  85. Integer priority = DEFAULT_PRIORITY;
  86. if (hostname.length == 3) {
  87. priority = Integer.valueOf(hostname[2]);
  88. }
  89. String ip = hostname[0];
  90. Integer port = Integer.valueOf(hostname[1]);
  91. List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
  92. for (int i = 0; i < priority; i++) {
  93. result.add(new InetSocketAddress(ip, port));
  94. }
  95. return result;
  96. }
  97. @Override
  98. public List<InetSocketAddress> getAll() {
  99. return Collections.unmodifiableList(container);
  100. }
  101. @Override
  102. public synchronized InetSocketAddress selector() {
  103. if (inner.isEmpty()) {
  104. if(!container.isEmpty()){
  105. inner.addAll(container);
  106. }else if(!trace.isEmpty()){
  107. synchronized (lock) {
  108. for(String hostname : trace){
  109. container.addAll(transfer(hostname));
  110. }
  111. Collections.shuffle(container);
  112. inner.addAll(container);
  113. }
  114. }
  115. }
  116. return inner.poll();//null
  117. }
  118. @Override
  119. public void close() {
  120. try {
  121. cachedPath.close();
  122. zookeeper.close();
  123. } catch (Exception e) {
  124. //
  125. }
  126. }
  127. }

到此为止,我们的Thrift基本上就可以顺利运行起来了。

使用ZooKeeper构建集群

通用办法是使用apache-curator组件来支持thrift连接zk提供集群服务并且推送服务信息变化。

其他高级话题

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法

4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

5.其他的改造如:

1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

2)Thrift通过两种方式调用服务Client和Iface

  1. // *) Client API 调用
  2. (EchoService.Client)client.echo("hello lilei");  ---(1)
  3. // *) Service 接口 调用
  4. (EchoService.Iface)service.echo("hello lilei");  ---(2)

Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

下面我们来一一实现:

一、pom.xml引入依赖jar包

  1. <dependency>
  2. <groupId>org.apache.thrift</groupId>
  3. <artifactId>libthrift</artifactId>
  4. <version>0.9.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>commons-pool</groupId>
  8. <artifactId>commons-pool</artifactId>
  9. <version>1.6</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-context</artifactId>
  14. <version>4.0.9.RELEASE</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.zookeeper</groupId>
  18. <artifactId>zookeeper</artifactId>
  19. <version>3.4.6</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.curator</groupId>
  23. <artifactId>curator-recipes</artifactId>
  24. <version>2.7.1</version>
  25. </dependency>

二、使用zookeeper管理服务节点配置

RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构:

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
  1). namespace: 命名空间,来区分不同应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境. 
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.java

package cn.slimsmart.thrift.rpc.zookeeper;

  1. import org.apache.curator.framework.CuratorFramework;
  2. import org.apache.curator.framework.CuratorFrameworkFactory;
  3. import org.apache.curator.retry.ExponentialBackoffRetry;
  4. import org.springframework.beans.factory.FactoryBean;
  5. import org.springframework.util.StringUtils;
  6. /**
  7. * 获取zookeeper客户端链接
  8. */
  9. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
  10. private String zkHosts;
  11. // session超时
  12. private int sessionTimeout = 30000;
  13. private int connectionTimeout = 30000;
  14. // 共享一个zk链接
  15. private boolean singleton = true;
  16. // 全局path前缀,常用来区分不同的应用
  17. private String namespace;
  18. private final static String ROOT = "rpc";
  19. private CuratorFramework zkClient;
  20. public void setZkHosts(String zkHosts) {
  21. this.zkHosts = zkHosts;
  22. }
  23. public void setSessionTimeout(int sessionTimeout) {
  24. this.sessionTimeout = sessionTimeout;
  25. }
  26. public void setConnectionTimeout(int connectionTimeout) {
  27. this.connectionTimeout = connectionTimeout;
  28. }
  29. public void setSingleton(boolean singleton) {
  30. this.singleton = singleton;
  31. }
  32. public void setNamespace(String namespace) {
  33. this.namespace = namespace;
  34. }
  35. public void setZkClient(CuratorFramework zkClient) {
  36. this.zkClient = zkClient;
  37. }
  38. @Override
  39. public CuratorFramework getObject() throws Exception {
  40. if (singleton) {
  41. if (zkClient == null) {
  42. zkClient = create();
  43. zkClient.start();
  44. }
  45. return zkClient;
  46. }
  47. return create();
  48. }
  49. @Override
  50. public Class<?> getObjectType() {
  51. return CuratorFramework.class;
  52. }
  53. @Override
  54. public boolean isSingleton() {
  55. return singleton;
  56. }
  57. public CuratorFramework create() throws Exception {
  58. if (StringUtils.isEmpty(namespace)) {
  59. namespace = ROOT;
  60. } else {
  61. namespace = ROOT +"/"+ namespace;
  62. }
  63. return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
  64. }
  65. public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
  66. CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
  67. return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
  68. .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
  69. .defaultData(null).build();
  70. }
  71. public void close() {
  72. if (zkClient != null) {
  73. zkClient.close();
  74. }
  75. }
  76. }

2.服务端注册服务

由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

ThriftServerIpResolve.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. /**
  3. *
  4. * 解析thrift-server端IP地址,用于注册服务
  5. * 1) 可以从一个物理机器或者虚机的特殊文件中解析
  6. * 2) 可以获取指定网卡序号的Ip
  7. * 3) 其他
  8. */
  9. public interface ThriftServerIpResolve {
  10. String getServerIp() throws Exception;
  11. void reset();
  12. //当IP变更时,将会调用reset方法
  13. static interface IpRestCalllBack{
  14. public void rest(String newIp);
  15. }
  16. }

可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.Inet6Address;
  3. import java.net.InetAddress;
  4. import java.net.NetworkInterface;
  5. import java.net.SocketException;
  6. import java.util.Enumeration;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. /**
  10. * 解析网卡Ip
  11. *
  12. */
  13. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
  14. private Logger logger = LoggerFactory.getLogger(getClass());
  15. //缓存
  16. private String serverIp;
  17. public void setServerIp(String serverIp) {
  18. this.serverIp = serverIp;
  19. }
  20. @Override
  21. public String getServerIp() {
  22. if (serverIp != null) {
  23. return serverIp;
  24. }
  25. // 一个主机有多个网络接口
  26. try {
  27. Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
  28. while (netInterfaces.hasMoreElements()) {
  29. NetworkInterface netInterface = netInterfaces.nextElement();
  30. // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
  31. Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
  32. while (addresses.hasMoreElements()) {
  33. InetAddress address = addresses.nextElement();
  34. if(address instanceof Inet6Address){
  35. continue;
  36. }
  37. if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
  38. serverIp = address.getHostAddress();
  39. logger.info("resolve server ip :"+ serverIp);
  40. continue;
  41. }
  42. }
  43. }
  44. } catch (SocketException e) {
  45. e.printStackTrace();
  46. }
  47. return serverIp;
  48. }
  49. @Override
  50. public void reset() {
  51. serverIp = null;
  52. }
  53. }

接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。

ThriftServerAddressRegister.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. /**
  3. * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
  4. */
  5. public interface ThriftServerAddressRegister {
  6. /**
  7. * 发布服务接口
  8. * @param service 服务接口名称,一个产品中不能重复
  9. * @param version 服务接口的版本号,默认1.0.0
  10. * @param address 服务发布的地址和端口
  11. */
  12. void register(String service,String version,String address);
  13. }

实现:ThriftServerAddressRegisterZookeeper.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.io.UnsupportedEncodingException;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.imps.CuratorFrameworkState;
  5. import org.apache.zookeeper.CreateMode;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.util.StringUtils;
  9. import cn.slimsmart.thrift.rpc.ThriftException;
  10. /**
  11. *  注册服务列表到Zookeeper
  12. */
  13. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
  14. private Logger logger = LoggerFactory.getLogger(getClass());
  15. private CuratorFramework zkClient;
  16. public ThriftServerAddressRegisterZookeeper(){}
  17. public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
  18. this.zkClient = zkClient;
  19. }
  20. public void setZkClient(CuratorFramework zkClient) {
  21. this.zkClient = zkClient;
  22. }
  23. @Override
  24. public void register(String service, String version, String address) {
  25. if(zkClient.getState() == CuratorFrameworkState.LATENT){
  26. zkClient.start();
  27. }
  28. if(StringUtils.isEmpty(version)){
  29. version="1.0.0";
  30. }
  31. //临时节点
  32. try {
  33. zkClient.create()
  34. .creatingParentsIfNeeded()
  35. .withMode(CreateMode.EPHEMERAL)
  36. .forPath("/"+service+"/"+version+"/"+address);
  37. } catch (UnsupportedEncodingException e) {
  38. logger.error("register service address to zookeeper exception:{}",e);
  39. throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
  40. } catch (Exception e) {
  41. logger.error("register service address to zookeeper exception:{}",e);
  42. throw new ThriftException("register service address to zookeeper exception:{}", e);
  43. }
  44. }
  45. public void close(){
  46. zkClient.close();
  47. }
  48. }

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.InetSocketAddress;
  3. import java.util.List;
  4. /**
  5. * thrift server-service地址提供者,以便构建客户端连接池
  6. */
  7. public interface ThriftServerAddressProvider {
  8. //获取服务名称
  9. String getService();
  10. /**
  11. * 获取所有服务端地址
  12. * @return
  13. */
  14. List<InetSocketAddress> findServerAddressList();
  15. /**
  16. * 选取一个合适的address,可以随机获取等‘
  17. * 内部可以使用合适的算法.
  18. * @return
  19. */
  20. InetSocketAddress selector();
  21. void close();
  22. }

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.InetSocketAddress;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.HashSet;
  6. import java.util.LinkedList;
  7. import java.util.List;
  8. import java.util.Queue;
  9. import java.util.Set;
  10. import org.apache.curator.framework.CuratorFramework;
  11. import org.apache.curator.framework.imps.CuratorFrameworkState;
  12. import org.apache.curator.framework.recipes.cache.ChildData;
  13. import org.apache.curator.framework.recipes.cache.PathChildrenCache;
  14. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
  15. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
  16. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.InitializingBean;
  20. /**
  21. * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
  22. */
  23. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
  24. private Logger logger = LoggerFactory.getLogger(getClass());
  25. // 注册服务
  26. private String service;
  27. // 服务版本号
  28. private String version = "1.0.0";
  29. private PathChildrenCache cachedPath;
  30. private CuratorFramework zkClient;
  31. // 用来保存当前provider所接触过的地址记录
  32. // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
  33. private Set<String> trace = new HashSet<String>();
  34. private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
  35. private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
  36. private Object lock = new Object();
  37. // 默认权重
  38. private static final Integer DEFAULT_WEIGHT = 1;
  39. public void setService(String service) {
  40. this.service = service;
  41. }
  42. public void setVersion(String version) {
  43. this.version = version;
  44. }
  45. public ThriftServerAddressProviderZookeeper() {
  46. }
  47. public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
  48. this.zkClient = zkClient;
  49. }
  50. public void setZkClient(CuratorFramework zkClient) {
  51. this.zkClient = zkClient;
  52. }
  53. @Override
  54. public void afterPropertiesSet() throws Exception {
  55. // 如果zk尚未启动,则启动
  56. if (zkClient.getState() == CuratorFrameworkState.LATENT) {
  57. zkClient.start();
  58. }
  59. buildPathChildrenCache(zkClient, getServicePath(), true);
  60. cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
  61. }
  62. private String getServicePath(){
  63. return "/" + service + "/" + version;
  64. }
  65. private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
  66. cachedPath = new PathChildrenCache(client, path, cacheData);
  67. cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
  68. @Override
  69. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  70. PathChildrenCacheEvent.Type eventType = event.getType();
  71. switch (eventType) {
  72. case CONNECTION_RECONNECTED:
  73. logger.info("Connection is reconection.");
  74. break;
  75. case CONNECTION_SUSPENDED:
  76. logger.info("Connection is suspended.");
  77. break;
  78. case CONNECTION_LOST:
  79. logger.warn("Connection error,waiting...");
  80. return;
  81. default:
  82. //
  83. }
  84. // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
  85. cachedPath.rebuild();
  86. rebuild();
  87. }
  88. protected void rebuild() throws Exception {
  89. List<ChildData> children = cachedPath.getCurrentData();
  90. if (children == null || children.isEmpty()) {
  91. // 有可能所有的thrift server都与zookeeper断开了链接
  92. // 但是,有可能,thrift client与thrift server之间的网络是良好的
  93. // 因此此处是否需要清空container,是需要多方面考虑的.
  94. container.clear();
  95. logger.error("thrift server-cluster error....");
  96. return;
  97. }
  98. List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
  99. String path = null;
  100. for (ChildData data : children) {
  101. path = data.getPath();
  102. logger.debug("get path:"+path);
  103. path = path.substring(getServicePath().length()+1);
  104. logger.debug("get serviceAddress:"+path);
  105. String address = new String(path.getBytes(), "utf-8");
  106. current.addAll(transfer(address));
  107. trace.add(address);
  108. }
  109. Collections.shuffle(current);
  110. synchronized (lock) {
  111. container.clear();
  112. container.addAll(current);
  113. inner.clear();
  114. inner.addAll(current);
  115. }
  116. }
  117. });
  118. }
  119. private List<InetSocketAddress> transfer(String address) {
  120. String[] hostname = address.split(":");
  121. Integer weight = DEFAULT_WEIGHT;
  122. if (hostname.length == 3) {
  123. weight = Integer.valueOf(hostname[2]);
  124. }
  125. String ip = hostname[0];
  126. Integer port = Integer.valueOf(hostname[1]);
  127. List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
  128. // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
  129. for (int i = 0; i < weight; i++) {
  130. result.add(new InetSocketAddress(ip, port));
  131. }
  132. return result;
  133. }
  134. @Override
  135. public List<InetSocketAddress> findServerAddressList() {
  136. return Collections.unmodifiableList(container);
  137. }
  138. @Override
  139. public synchronized InetSocketAddress selector() {
  140. if (inner.isEmpty()) {
  141. if (!container.isEmpty()) {
  142. inner.addAll(container);
  143. } else if (!trace.isEmpty()) {
  144. synchronized (lock) {
  145. for (String hostname : trace) {
  146. container.addAll(transfer(hostname));
  147. }
  148. Collections.shuffle(container);
  149. inner.addAll(container);
  150. }
  151. }
  152. }
  153. return inner.poll();
  154. }
  155. @Override
  156. public void close() {
  157. try {
  158. cachedPath.close();
  159. zkClient.close();
  160. } catch (Exception e) {
  161. }
  162. }
  163. @Override
  164. public String getService() {
  165. return service;
  166. }
  167. }

对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java

三、服务端服务注册实现

ThriftServiceServerFactory.java

  1. package cn.slimsmart.thrift.rpc;
  2. import java.lang.instrument.IllegalClassFormatException;
  3. import java.lang.reflect.Constructor;
  4. import org.apache.thrift.TProcessor;
  5. import org.apache.thrift.TProcessorFactory;
  6. import org.apache.thrift.protocol.TBinaryProtocol;
  7. import org.apache.thrift.server.TServer;
  8. import org.apache.thrift.server.TThreadedSelectorServer;
  9. import org.apache.thrift.transport.TFramedTransport;
  10. import org.apache.thrift.transport.TNonblockingServerSocket;
  11. import org.springframework.beans.factory.InitializingBean;
  12. import org.springframework.util.StringUtils;
  13. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
  15. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;
  16. /**
  17. * 服务端注册服务工厂
  18. */
  19. public class ThriftServiceServerFactory implements InitializingBean {
  20. // 服务注册本机端口
  21. private Integer port = 8299;
  22. // 优先级
  23. private Integer weight = 1;// default
  24. // 服务实现类
  25. private Object service;// serice实现类
  26. //服务版本号
  27. private String version;
  28. // 解析本机IP
  29. private ThriftServerIpResolve thriftServerIpResolve;
  30. //服务注册
  31. private ThriftServerAddressRegister thriftServerAddressRegister;
  32. private ServerThread serverThread;
  33. public void setPort(Integer port) {
  34. this.port = port;
  35. }
  36. public void setWeight(Integer weight) {
  37. this.weight = weight;
  38. }
  39. public void setService(Object service) {
  40. this.service = service;
  41. }
  42. public void setVersion(String version) {
  43. this.version = version;
  44. }
  45. public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
  46. this.thriftServerIpResolve = thriftServerIpResolve;
  47. }
  48. public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
  49. this.thriftServerAddressRegister = thriftServerAddressRegister;
  50. }
  51. @Override
  52. public void afterPropertiesSet() throws Exception {
  53. if (thriftServerIpResolve == null) {
  54. thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
  55. }
  56. String serverIP = thriftServerIpResolve.getServerIp();
  57. if (StringUtils.isEmpty(serverIP)) {
  58. throw new ThriftException("cant find server ip...");
  59. }
  60. String hostname = serverIP + ":" + port + ":" + weight;
  61. Class<?> serviceClass = service.getClass();
  62. // 获取实现类接口
  63. Class<?>[] interfaces = serviceClass.getInterfaces();
  64. if (interfaces.length == 0) {
  65. throw new IllegalClassFormatException("service-class should implements Iface");
  66. }
  67. // reflect,load "Processor";
  68. TProcessor processor = null;
  69. String serviceName = null;
  70. for (Class<?> clazz : interfaces) {
  71. String cname = clazz.getSimpleName();
  72. if (!cname.equals("Iface")) {
  73. continue;
  74. }
  75. serviceName = clazz.getEnclosingClass().getName();
  76. String pname = serviceName + "$Processor";
  77. try {
  78. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  79. Class<?> pclass = classLoader.loadClass(pname);
  80. if (!TProcessor.class.isAssignableFrom(pclass)) {
  81. continue;
  82. }
  83. Constructor<?> constructor = pclass.getConstructor(clazz);
  84. processor = (TProcessor) constructor.newInstance(service);
  85. break;
  86. } catch (Exception e) {
  87. //
  88. }
  89. }
  90. if (processor == null) {
  91. throw new IllegalClassFormatException("service-class should implements Iface");
  92. }
  93. //需要单独的线程,因为serve方法是阻塞的.
  94. serverThread = new ServerThread(processor, port);
  95. serverThread.start();
  96. // 注册服务
  97. if (thriftServerAddressRegister != null) {
  98. thriftServerAddressRegister.register(serviceName, version, hostname);
  99. }
  100. }
  101. class ServerThread extends Thread {
  102. private TServer server;
  103. ServerThread(TProcessor processor, int port) throws Exception {
  104. TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
  105. TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
  106. TProcessorFactory processorFactory = new TProcessorFactory(processor);
  107. tArgs.processorFactory(processorFactory);
  108. tArgs.transportFactory(new TFramedTransport.Factory());
  109. tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
  110. server = new TThreadedSelectorServer(tArgs);
  111. }
  112. @Override
  113. public void run(){
  114. try{
  115. //启动服务
  116. server.serve();
  117. }catch(Exception e){
  118. //
  119. }
  120. }
  121. public void stopServer(){
  122. server.stop();
  123. }
  124. }
  125. public void close() {
  126. serverThread.stopServer();
  127. }
  128. }

四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java

  1. package cn.slimsmart.thrift.rpc;
  2. import java.net.InetSocketAddress;
  3. import org.apache.commons.pool.BasePoolableObjectFactory;
  4. import org.apache.thrift.TServiceClient;
  5. import org.apache.thrift.TServiceClientFactory;
  6. import org.apache.thrift.protocol.TBinaryProtocol;
  7. import org.apache.thrift.protocol.TProtocol;
  8. import org.apache.thrift.transport.TFramedTransport;
  9. import org.apache.thrift.transport.TSocket;
  10. import org.apache.thrift.transport.TTransport;
  11. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
  12. /**
  13. * 连接池,thrift-client for spring
  14. */
  15. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
  16. private final ThriftServerAddressProvider serverAddressProvider;
  17. private final TServiceClientFactory<TServiceClient> clientFactory;
  18. private PoolOperationCallBack callback;
  19. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
  20. this.serverAddressProvider = addressProvider;
  21. this.clientFactory = clientFactory;
  22. }
  23. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
  24. PoolOperationCallBack callback) throws Exception {
  25. this.serverAddressProvider = addressProvider;
  26. this.clientFactory = clientFactory;
  27. this.callback = callback;
  28. }
  29. static interface PoolOperationCallBack {
  30. // 销毁client之前执行
  31. void destroy(TServiceClient client);
  32. // 创建成功是执行
  33. void make(TServiceClient client);
  34. }
  35. public void destroyObject(TServiceClient client) throws Exception {
  36. if (callback != null) {
  37. try {
  38. callback.destroy(client);
  39. } catch (Exception e) {
  40. //
  41. }
  42. }
  43. TTransport pin = client.getInputProtocol().getTransport();
  44. pin.close();
  45. }
  46. public boolean validateObject(TServiceClient client) {
  47. TTransport pin = client.getInputProtocol().getTransport();
  48. return pin.isOpen();
  49. }
  50. @Override
  51. public TServiceClient makeObject() throws Exception {
  52. InetSocketAddress address = serverAddressProvider.selector();
  53. TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
  54. TTransport transport = new TFramedTransport(tsocket);
  55. TProtocol protocol = new TBinaryProtocol(transport);
  56. TServiceClient client = this.clientFactory.getClient(protocol);
  57. transport.open();
  58. if (callback != null) {
  59. try {
  60. callback.make(client);
  61. } catch (Exception e) {
  62. //
  63. }
  64. }
  65. return client;
  66. }
  67. }

客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

  1. package cn.slimsmart.thrift.rpc;
  2. import java.lang.reflect.InvocationHandler;
  3. import java.lang.reflect.Method;
  4. import java.lang.reflect.Proxy;
  5. import org.apache.commons.pool.impl.GenericObjectPool;
  6. import org.apache.thrift.TServiceClient;
  7. import org.apache.thrift.TServiceClientFactory;
  8. import org.springframework.beans.factory.FactoryBean;
  9. import org.springframework.beans.factory.InitializingBean;
  10. import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
  11. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
  12. /**
  13. * 客户端代理
  14. */
  15. @SuppressWarnings({ "unchecked", "rawtypes" })
  16. public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
  17. private Integer maxActive = 32;// 最大活跃连接数
  18. // ms,default 3 min,链接空闲时间
  19. // -1,关闭空闲检测
  20. private Integer idleTime = 180000;
  21. private ThriftServerAddressProvider serverAddressProvider;
  22. private Object proxyClient;
  23. private Class<?> objectClass;
  24. private GenericObjectPool<TServiceClient> pool;
  25. private PoolOperationCallBack callback = new PoolOperationCallBack() {
  26. @Override
  27. public void make(TServiceClient client) {
  28. System.out.println("create");
  29. }
  30. @Override
  31. public void destroy(TServiceClient client) {
  32. System.out.println("destroy");
  33. }
  34. };
  35. public void setMaxActive(Integer maxActive) {
  36. this.maxActive = maxActive;
  37. }
  38. public void setIdleTime(Integer idleTime) {
  39. this.idleTime = idleTime;
  40. }
  41. public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
  42. this.serverAddressProvider = serverAddressProvider;
  43. }
  44. @Override
  45. public void afterPropertiesSet() throws Exception {
  46. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  47. // 加载Iface接口
  48. objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
  49. // 加载Client.Factory类
  50. Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
  51. TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
  52. ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
  53. GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
  54. poolConfig.maxActive = maxActive;
  55. poolConfig.minIdle = 0;
  56. poolConfig.minEvictableIdleTimeMillis = idleTime;
  57. poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
  58. pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
  59. proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
  60. @Override
  61. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  62. //
  63. TServiceClient client = pool.borrowObject();
  64. try {
  65. return method.invoke(client, args);
  66. } catch (Exception e) {
  67. throw e;
  68. } finally {
  69. pool.returnObject(client);
  70. }
  71. }
  72. });
  73. }
  74. @Override
  75. public Object getObject() throws Exception {
  76. return proxyClient;
  77. }
  78. @Override
  79. public Class<?> getObjectType() {
  80. return objectClass;
  81. }
  82. @Override
  83. public boolean isSingleton() {
  84. return true;
  85. }
  86. public void close() {
  87. if (serverAddressProvider != null) {
  88. serverAddressProvider.close();
  89. }
  90. }
  91. }

下面我们看一下服务端和客户端的配置;

服务端spring-context-thrift-server.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  6. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
  8. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
  9. default-lazy-init="false">
  10. <!-- zookeeper -->
  11. <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
  12. destroy-method="close">
  13. <property name="zkHosts"
  14. value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
  15. <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
  16. <property name="connectionTimeout" value="3000" />
  17. <property name="sessionTimeout" value="3000" />
  18. <property name="singleton" value="true" />
  19. </bean>
  20. <bean id="sericeAddressRegister"
  21. class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
  22. destroy-method="close">
  23. <property name="zkClient" ref="thriftZookeeper" />
  24. </bean>
  25. <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />
  26. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  27. destroy-method="close">
  28. <property name="service" ref="echoSerivceImpl" />
  29. <property name="port" value="9000" />
  30. <property name="version" value="1.0.0" />
  31. <property name="weight" value="1" />
  32. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  33. </bean>
  34. <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  35. destroy-method="close">
  36. <property name="service" ref="echoSerivceImpl" />
  37. <property name="port" value="9001" />
  38. <property name="version" value="1.0.0" />
  39. <property name="weight" value="1" />
  40. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  41. </bean>
  42. <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  43. destroy-method="close">
  44. <property name="service" ref="echoSerivceImpl" />
  45. <property name="port" value="9002" />
  46. <property name="version" value="1.0.0" />
  47. <property name="weight" value="1" />
  48. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  49. </bean>
  50. </beans>

客户端:spring-context-thrift-client.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  6. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
  8. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
  9. default-lazy-init="false">
  10. <!-- fixedAddress -->
  11. <!--
  12. <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
  13. <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
  14. <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
  15. </bean>
  16. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
  17. <property name="maxActive" value="5" />
  18. <property name="idleTime" value="10000" />
  19. <property name="serverAddressProvider" ref="fixedAddressProvider" />
  20. </bean>
  21. -->
  22. <!-- zookeeper   -->
  23. <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
  24. destroy-method="close">
  25. <property name="zkHosts"
  26. value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
  27. <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
  28. <property name="connectionTimeout" value="3000" />
  29. <property name="sessionTimeout" value="3000" />
  30. <property name="singleton" value="true" />
  31. </bean>
  32. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
  33. <property name="maxActive" value="5" />
  34. <property name="idleTime" value="1800000" />
  35. <property name="serverAddressProvider">
  36. <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
  37. <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
  38. <property name="version" value="1.0.0" />
  39. <property name="zkClient" ref="thriftZookeeper" />
  40. </bean>
  41. </property>
  42. </bean>
  43. </beans>

运行服务端后,我们可以看见zookeeper注册了多个服务地址。

问题

安装文档:http://thrift.apache.org/docs/BuildingFromSource

遇到一个bison版本过低的问题:http://blog.csdn.net/qq910894904/article/details/41132779

参考资料

Thrift原理简析(Java)-http://shift-alt-ctrl.iteye.com/blog/1987416

Thrift-http://www.cnblogs.com/mumuxinfei/tag/thrift/

Thrift与ZooKeeper-http://blog.csdn.net/zhu_tianwei/article/details/44115667

Thrift专题-http://blog.csdn.net/column/details/slimina-thrift.html

Thrift服务端与Spring-http://shift-alt-ctrl.iteye.com/blog/1990026

Thrift客户端与Spring-http://shift-alt-ctrl.iteye.com/blog/1990030

时间: 2024-08-24 23:52:11

Thrift全面介绍的相关文章

thrift使用介绍

写的ppt,不再摘抄出来了,直接上图. thrift使用介绍

Thrift安装介绍

一.简介 1.语言库要求 因为thrift支持多语言.所以编译thrift源代码的过程中,会用到该语言的一些类库.如c++的boost.java的jdk等. 那么,在安装thrift过程中,须要对各种语言安装哪些类库和工具呢,官方对此有具体的介绍: 所需语言库和工具 C++ :Boost 1.33.1+ (必选),libevent (可选,用来创建非堵塞server) ,zlib (可选) Java :Java 1.5+ (必选),Apache Ant (必选),Apache Ivy(必选),A

Thrift架构介绍

Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现. 架构 Thrift实际上是实现了C/S模式,通

Thrift框架介绍

1.前言 Thrift是一个跨语言的服务部署框架,最初由Facebook于2007年开发,2008年进入Apache开源项目.Thrift通过一个中间语言(IDL, 接口定义语言)来定义RPC的接口和数据类型,然后通过一个编译器生成不同语言的代码(目前支持C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk和OCaml),并由生成的代码负责RPC协议层和传输层的实现. 2.架构 Thrift实际上是实现了

thrift 安装介绍

一.About  thrift            thrift是一种可伸缩的跨语言服务的发展软件框架.它结合了功能强大的软件堆栈的代码生成引擎,以建设服务,工作效率和无缝地与C + +,C#,Java,Python和PHP和Ruby结合.thrift是facebook开发的,我们现在把它作为开源软件使用.thrift允许你定义一个简单的定义文件中的数据类型和服务接口.以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言(来自百度百科).      >>>

使用Thrift RPC编写程序(服务端和客户端)

1. Thrift类介绍 Thrift代码包(位于thrift-0.6.1/lib/cpp/src)有以下几个目录: concurrency:并发和时钟管理方面的库processor:Processor相关类protocal:Protocal相关类transport:transport相关类server:server相关类 1.1 Transport类(how is transmitted?)负责数据传输,有以下几个可用类:TFileTransport:文件(日志)传输类,允许client将文件

RPC通信框架——RCF介绍(替换COM)

阅读目录 RPC通信框架 为什么选择RCF 简单的性能测试 参考资料 总结 现有的软件中用了大量的COM接口,导致无法跨平台,当然由于与Windows结合的太紧密,还有很多无法跨平台的地方.那么为了实现跨平台,支持Linux系统,以及后续的分布式,首要任务是去除COM接口. 在对大量框架进行调研后,决定使用RCF替换COM接口. 回到顶部 RPC通信框架 CORBA ICE Thrift zeromq dbus RCF YAMI4 TAO 回到顶部 为什么选择RCF 经过各项对比,认为: RCF

RPC通信框架&mdash;&mdash;RCF介绍

现有的软件中用了大量的COM接口,导致无法跨平台,当然由于与Windows结合的太紧密,还有很多无法跨平台的地方.那么为了实现跨平台,支持Linux系统,以及后续的分布式,首要任务是去除COM接口. 在对大量框架进行调研后,决定使用RCF替换COM接口. RPC通信框架 CORBA ICE Thrift zeromq dbus RCF YAMI4 TAO 为什么选择RCF 经过各项对比,认为: RCF的使用方式与现有的COM接口方式非常类似,在开发上可以更快速.更容易的替换COM,并且可以少犯错

Thrift --- 支持双向通信

[问题] Thrift采用了C/S模型,不支持双向通信:client只能远程调用server端的RPC接口,但client端则没有RPC供server端调用,这意味着,client端能够主动与server端通信,但server端不能主动与client端通信而只能被动地对client端的请求作出应答.这种RPC模式在某些应用中存在缺陷,比如:有些应用,在大部分情况下,client端会主动向server端发请求或者向server端发送数据,而在少部分情况下,server端也需要主动向client发送