基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法

4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

5.其他的改造如:

1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

2)Thrift通过两种方式调用服务Client和Iface

[java] view plain copy print?

  1. // *) Client API 调用
  2. (EchoService.Client)client.echo("hello lilei");  ---(1)
  3. // *) Service 接口 调用
  4. (EchoService.Iface)service.echo("hello lilei");  ---(2)
// *) Client API 调用
(EchoService.Client)client.echo("hello lilei");  ---(1)
// *) Service 接口 调用
(EchoService.Iface)service.echo("hello lilei");  ---(2)

Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

下面我们来一一实现:

一、pom.xml引入依赖jar包

[html] view plain copy print?

  1. <dependency>
  2. <groupId>org.apache.thrift</groupId>
  3. <artifactId>libthrift</artifactId>
  4. <version>0.9.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>commons-pool</groupId>
  8. <artifactId>commons-pool</artifactId>
  9. <version>1.6</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework</groupId>
  13. <artifactId>spring-context</artifactId>
  14. <version>4.0.9.RELEASE</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.zookeeper</groupId>
  18. <artifactId>zookeeper</artifactId>
  19. <version>3.4.6</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.curator</groupId>
  23. <artifactId>curator-recipes</artifactId>
  24. <version>2.7.1</version>
  25. </dependency>
<dependency>
			<groupId>org.apache.thrift</groupId>
			<artifactId>libthrift</artifactId>
			<version>0.9.2</version>
		</dependency>
		<dependency>
			<groupId>commons-pool</groupId>
			<artifactId>commons-pool</artifactId>
			<version>1.6</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>4.0.9.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.7.1</version>
		</dependency>

二、使用zookeeper管理服务节点配置

RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构:

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
  1). namespace: 命名空间,来区分不同应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.retry.ExponentialBackoffRetry;
  5. import org.springframework.beans.factory.FactoryBean;
  6. import org.springframework.util.StringUtils;
  7. /**
  8. * 获取zookeeper客户端链接
  9. */
  10. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
  11. private String zkHosts;
  12. // session超时
  13. private int sessionTimeout = 30000;
  14. private int connectionTimeout = 30000;
  15. // 共享一个zk链接
  16. private boolean singleton = true;
  17. // 全局path前缀,常用来区分不同的应用
  18. private String namespace;
  19. private final static String ROOT = "rpc";
  20. private CuratorFramework zkClient;
  21. public void setZkHosts(String zkHosts) {
  22. this.zkHosts = zkHosts;
  23. }
  24. public void setSessionTimeout(int sessionTimeout) {
  25. this.sessionTimeout = sessionTimeout;
  26. }
  27. public void setConnectionTimeout(int connectionTimeout) {
  28. this.connectionTimeout = connectionTimeout;
  29. }
  30. public void setSingleton(boolean singleton) {
  31. this.singleton = singleton;
  32. }
  33. public void setNamespace(String namespace) {
  34. this.namespace = namespace;
  35. }
  36. public void setZkClient(CuratorFramework zkClient) {
  37. this.zkClient = zkClient;
  38. }
  39. @Override
  40. public CuratorFramework getObject() throws Exception {
  41. if (singleton) {
  42. if (zkClient == null) {
  43. zkClient = create();
  44. zkClient.start();
  45. }
  46. return zkClient;
  47. }
  48. return create();
  49. }
  50. @Override
  51. public Class<?> getObjectType() {
  52. return CuratorFramework.class;
  53. }
  54. @Override
  55. public boolean isSingleton() {
  56. return singleton;
  57. }
  58. public CuratorFramework create() throws Exception {
  59. if (StringUtils.isEmpty(namespace)) {
  60. namespace = ROOT;
  61. } else {
  62. namespace = ROOT +"/"+ namespace;
  63. }
  64. return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
  65. }
  66. public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
  67. CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
  68. return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
  69. .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
  70. .defaultData(null).build();
  71. }
  72. public void close() {
  73. if (zkClient != null) {
  74. zkClient.close();
  75. }
  76. }
  77. }
package cn.slimsmart.thrift.rpc.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils;

/**
 * 获取zookeeper客户端链接
 */
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {

	private String zkHosts;
	// session超时
	private int sessionTimeout = 30000;
	private int connectionTimeout = 30000;

	// 共享一个zk链接
	private boolean singleton = true;

	// 全局path前缀,常用来区分不同的应用
	private String namespace;

	private final static String ROOT = "rpc";

	private CuratorFramework zkClient;

	public void setZkHosts(String zkHosts) {
		this.zkHosts = zkHosts;
	}

	public void setSessionTimeout(int sessionTimeout) {
		this.sessionTimeout = sessionTimeout;
	}

	public void setConnectionTimeout(int connectionTimeout) {
		this.connectionTimeout = connectionTimeout;
	}

	public void setSingleton(boolean singleton) {
		this.singleton = singleton;
	}

	public void setNamespace(String namespace) {
		this.namespace = namespace;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public CuratorFramework getObject() throws Exception {
		if (singleton) {
			if (zkClient == null) {
				zkClient = create();
				zkClient.start();
			}
			return zkClient;
		}
		return create();
	}

	@Override
	public Class<?> getObjectType() {
		return CuratorFramework.class;
	}

	@Override
	public boolean isSingleton() {
		return singleton;
	}

	public CuratorFramework create() throws Exception {
		if (StringUtils.isEmpty(namespace)) {
			namespace = ROOT;
		} else {
			namespace = ROOT +"/"+ namespace;
		}
		return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
	}

	public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
		CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
		return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
				.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
				.defaultData(null).build();
	}

	public void close() {
		if (zkClient != null) {
			zkClient.close();
		}
	}
}

2.服务端注册服务

由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

ThriftServerIpResolve.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. /**
  3. *
  4. * 解析thrift-server端IP地址,用于注册服务
  5. * 1) 可以从一个物理机器或者虚机的特殊文件中解析
  6. * 2) 可以获取指定网卡序号的Ip
  7. * 3) 其他
  8. */
  9. public interface ThriftServerIpResolve {
  10. String getServerIp() throws Exception;
  11. void reset();
  12. //当IP变更时,将会调用reset方法
  13. static interface IpRestCalllBack{
  14. public void rest(String newIp);
  15. }
  16. }
package cn.slimsmart.thrift.rpc.zookeeper;

/**
 *
 * 解析thrift-server端IP地址,用于注册服务
 * 1) 可以从一个物理机器或者虚机的特殊文件中解析
 * 2) 可以获取指定网卡序号的Ip
 * 3) 其他
 */
public interface ThriftServerIpResolve {

	String getServerIp() throws Exception;

	void reset();

	//当IP变更时,将会调用reset方法
	static interface IpRestCalllBack{
		public void rest(String newIp);
	}
}

可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.Inet6Address;
  3. import java.net.InetAddress;
  4. import java.net.NetworkInterface;
  5. import java.net.SocketException;
  6. import java.util.Enumeration;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. /**
  10. * 解析网卡Ip
  11. *
  12. */
  13. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
  14. private Logger logger = LoggerFactory.getLogger(getClass());
  15. //缓存
  16. private String serverIp;
  17. public void setServerIp(String serverIp) {
  18. this.serverIp = serverIp;
  19. }
  20. @Override
  21. public String getServerIp() {
  22. if (serverIp != null) {
  23. return serverIp;
  24. }
  25. // 一个主机有多个网络接口
  26. try {
  27. Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
  28. while (netInterfaces.hasMoreElements()) {
  29. NetworkInterface netInterface = netInterfaces.nextElement();
  30. // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
  31. Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
  32. while (addresses.hasMoreElements()) {
  33. InetAddress address = addresses.nextElement();
  34. if(address instanceof Inet6Address){
  35. continue;
  36. }
  37. if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
  38. serverIp = address.getHostAddress();
  39. logger.info("resolve server ip :"+ serverIp);
  40. continue;
  41. }
  42. }
  43. }
  44. } catch (SocketException e) {
  45. e.printStackTrace();
  46. }
  47. return serverIp;
  48. }
  49. @Override
  50. public void reset() {
  51. serverIp = null;
  52. }
  53. }
package cn.slimsmart.thrift.rpc.zookeeper;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 解析网卡Ip
 *
 */
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {

	private Logger logger = LoggerFactory.getLogger(getClass());

	//缓存
	private String serverIp;

	public void setServerIp(String serverIp) {
		this.serverIp = serverIp;
	}

	@Override
	public String getServerIp() {
		if (serverIp != null) {
			return serverIp;
		}
		// 一个主机有多个网络接口
		try {
			Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
			while (netInterfaces.hasMoreElements()) {
				NetworkInterface netInterface = netInterfaces.nextElement();
				// 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
				Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
				while (addresses.hasMoreElements()) {
					InetAddress address = addresses.nextElement();
					if(address instanceof Inet6Address){
						continue;
					}
					if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
						serverIp = address.getHostAddress();
						logger.info("resolve server ip :"+ serverIp);
						continue;
					}
				}
			}
		} catch (SocketException e) {
			e.printStackTrace();
		}
		return serverIp;
	}

	@Override
	public void reset() {
		serverIp = null;
	}
}

接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. /**
  3. * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
  4. */
  5. public interface ThriftServerAddressRegister {
  6. /**
  7. * 发布服务接口
  8. * @param service 服务接口名称,一个产品中不能重复
  9. * @param version 服务接口的版本号,默认1.0.0
  10. * @param address 服务发布的地址和端口
  11. */
  12. void register(String service,String version,String address);
  13. }
package cn.slimsmart.thrift.rpc.zookeeper;

/**
 * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
 */
public interface ThriftServerAddressRegister {
	/**
	 * 发布服务接口
	 * @param service 服务接口名称,一个产品中不能重复
	 * @param version 服务接口的版本号,默认1.0.0
	 * @param address 服务发布的地址和端口
	 */
	void register(String service,String version,String address);
}

实现:ThriftServerAddressRegisterZookeeper.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.io.UnsupportedEncodingException;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.imps.CuratorFrameworkState;
  5. import org.apache.zookeeper.CreateMode;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.util.StringUtils;
  9. import cn.slimsmart.thrift.rpc.ThriftException;
  10. /**
  11. *  注册服务列表到Zookeeper
  12. */
  13. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
  14. private Logger logger = LoggerFactory.getLogger(getClass());
  15. private CuratorFramework zkClient;
  16. public ThriftServerAddressRegisterZookeeper(){}
  17. public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
  18. this.zkClient = zkClient;
  19. }
  20. public void setZkClient(CuratorFramework zkClient) {
  21. this.zkClient = zkClient;
  22. }
  23. @Override
  24. public void register(String service, String version, String address) {
  25. if(zkClient.getState() == CuratorFrameworkState.LATENT){
  26. zkClient.start();
  27. }
  28. if(StringUtils.isEmpty(version)){
  29. version="1.0.0";
  30. }
  31. //临时节点
  32. try {
  33. zkClient.create()
  34. .creatingParentsIfNeeded()
  35. .withMode(CreateMode.EPHEMERAL)
  36. .forPath("/"+service+"/"+version+"/"+address);
  37. } catch (UnsupportedEncodingException e) {
  38. logger.error("register service address to zookeeper exception:{}",e);
  39. throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
  40. } catch (Exception e) {
  41. logger.error("register service address to zookeeper exception:{}",e);
  42. throw new ThriftException("register service address to zookeeper exception:{}", e);
  43. }
  44. }
  45. public void close(){
  46. zkClient.close();
  47. }
  48. }
package cn.slimsmart.thrift.rpc.zookeeper;

import java.io.UnsupportedEncodingException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import cn.slimsmart.thrift.rpc.ThriftException;

/**
 *  注册服务列表到Zookeeper
 */
public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{

	private Logger logger = LoggerFactory.getLogger(getClass());

	private CuratorFramework zkClient;

	public ThriftServerAddressRegisterZookeeper(){}

	public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
		this.zkClient = zkClient;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public void register(String service, String version, String address) {
		if(zkClient.getState() == CuratorFrameworkState.LATENT){
			zkClient.start();
		}
		if(StringUtils.isEmpty(version)){
			version="1.0.0";
		}
		//临时节点
		try {
			zkClient.create()
				.creatingParentsIfNeeded()
				.withMode(CreateMode.EPHEMERAL)
				.forPath("/"+service+"/"+version+"/"+address);
		} catch (UnsupportedEncodingException e) {
			logger.error("register service address to zookeeper exception:{}",e);
			throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
		} catch (Exception e) {
			logger.error("register service address to zookeeper exception:{}",e);
			throw new ThriftException("register service address to zookeeper exception:{}", e);
		}
	}

	public void close(){
		zkClient.close();
	}
}

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.InetSocketAddress;
  3. import java.util.List;
  4. /**
  5. * thrift server-service地址提供者,以便构建客户端连接池
  6. */
  7. public interface ThriftServerAddressProvider {
  8. //获取服务名称
  9. String getService();
  10. /**
  11. * 获取所有服务端地址
  12. * @return
  13. */
  14. List<InetSocketAddress> findServerAddressList();
  15. /**
  16. * 选取一个合适的address,可以随机获取等‘
  17. * 内部可以使用合适的算法.
  18. * @return
  19. */
  20. InetSocketAddress selector();
  21. void close();
  22. }
package cn.slimsmart.thrift.rpc.zookeeper;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * thrift server-service地址提供者,以便构建客户端连接池
 */
public interface ThriftServerAddressProvider {

	//获取服务名称
	String getService();

	/**
	 * 获取所有服务端地址
	 * @return
	 */
    List<InetSocketAddress> findServerAddressList();

    /**
     * 选取一个合适的address,可以随机获取等‘
     * 内部可以使用合适的算法.
     * @return
     */
    InetSocketAddress selector();

    void close();
}

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc.zookeeper;
  2. import java.net.InetSocketAddress;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.HashSet;
  6. import java.util.LinkedList;
  7. import java.util.List;
  8. import java.util.Queue;
  9. import java.util.Set;
  10. import org.apache.curator.framework.CuratorFramework;
  11. import org.apache.curator.framework.imps.CuratorFrameworkState;
  12. import org.apache.curator.framework.recipes.cache.ChildData;
  13. import org.apache.curator.framework.recipes.cache.PathChildrenCache;
  14. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
  15. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
  16. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.InitializingBean;
  20. /**
  21. * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
  22. */
  23. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
  24. private Logger logger = LoggerFactory.getLogger(getClass());
  25. // 注册服务
  26. private String service;
  27. // 服务版本号
  28. private String version = "1.0.0";
  29. private PathChildrenCache cachedPath;
  30. private CuratorFramework zkClient;
  31. // 用来保存当前provider所接触过的地址记录
  32. // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
  33. private Set<String> trace = new HashSet<String>();
  34. private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
  35. private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
  36. private Object lock = new Object();
  37. // 默认权重
  38. private static final Integer DEFAULT_WEIGHT = 1;
  39. public void setService(String service) {
  40. this.service = service;
  41. }
  42. public void setVersion(String version) {
  43. this.version = version;
  44. }
  45. public ThriftServerAddressProviderZookeeper() {
  46. }
  47. public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
  48. this.zkClient = zkClient;
  49. }
  50. public void setZkClient(CuratorFramework zkClient) {
  51. this.zkClient = zkClient;
  52. }
  53. @Override
  54. public void afterPropertiesSet() throws Exception {
  55. // 如果zk尚未启动,则启动
  56. if (zkClient.getState() == CuratorFrameworkState.LATENT) {
  57. zkClient.start();
  58. }
  59. buildPathChildrenCache(zkClient, getServicePath(), true);
  60. cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
  61. }
  62. private String getServicePath(){
  63. return "/" + service + "/" + version;
  64. }
  65. private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
  66. cachedPath = new PathChildrenCache(client, path, cacheData);
  67. cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
  68. @Override
  69. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  70. PathChildrenCacheEvent.Type eventType = event.getType();
  71. switch (eventType) {
  72. case CONNECTION_RECONNECTED:
  73. logger.info("Connection is reconection.");
  74. break;
  75. case CONNECTION_SUSPENDED:
  76. logger.info("Connection is suspended.");
  77. break;
  78. case CONNECTION_LOST:
  79. logger.warn("Connection error,waiting...");
  80. return;
  81. default:
  82. //
  83. }
  84. // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
  85. cachedPath.rebuild();
  86. rebuild();
  87. }
  88. protected void rebuild() throws Exception {
  89. List<ChildData> children = cachedPath.getCurrentData();
  90. if (children == null || children.isEmpty()) {
  91. // 有可能所有的thrift server都与zookeeper断开了链接
  92. // 但是,有可能,thrift client与thrift server之间的网络是良好的
  93. // 因此此处是否需要清空container,是需要多方面考虑的.
  94. container.clear();
  95. logger.error("thrift server-cluster error....");
  96. return;
  97. }
  98. List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
  99. String path = null;
  100. for (ChildData data : children) {
  101. path = data.getPath();
  102. logger.debug("get path:"+path);
  103. path = path.substring(getServicePath().length()+1);
  104. logger.debug("get serviceAddress:"+path);
  105. String address = new String(path.getBytes(), "utf-8");
  106. current.addAll(transfer(address));
  107. trace.add(address);
  108. }
  109. Collections.shuffle(current);
  110. synchronized (lock) {
  111. container.clear();
  112. container.addAll(current);
  113. inner.clear();
  114. inner.addAll(current);
  115. }
  116. }
  117. });
  118. }
  119. private List<InetSocketAddress> transfer(String address) {
  120. String[] hostname = address.split(":");
  121. Integer weight = DEFAULT_WEIGHT;
  122. if (hostname.length == 3) {
  123. weight = Integer.valueOf(hostname[2]);
  124. }
  125. String ip = hostname[0];
  126. Integer port = Integer.valueOf(hostname[1]);
  127. List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
  128. // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
  129. for (int i = 0; i < weight; i++) {
  130. result.add(new InetSocketAddress(ip, port));
  131. }
  132. return result;
  133. }
  134. @Override
  135. public List<InetSocketAddress> findServerAddressList() {
  136. return Collections.unmodifiableList(container);
  137. }
  138. @Override
  139. public synchronized InetSocketAddress selector() {
  140. if (inner.isEmpty()) {
  141. if (!container.isEmpty()) {
  142. inner.addAll(container);
  143. } else if (!trace.isEmpty()) {
  144. synchronized (lock) {
  145. for (String hostname : trace) {
  146. container.addAll(transfer(hostname));
  147. }
  148. Collections.shuffle(container);
  149. inner.addAll(container);
  150. }
  151. }
  152. }
  153. return inner.poll();
  154. }
  155. @Override
  156. public void close() {
  157. try {
  158. cachedPath.close();
  159. zkClient.close();
  160. } catch (Exception e) {
  161. }
  162. }
  163. @Override
  164. public String getService() {
  165. return service;
  166. }
  167. }
package cn.slimsmart.thrift.rpc.zookeeper;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

/**
 * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
 */
public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {

	private Logger logger = LoggerFactory.getLogger(getClass());

	// 注册服务
	private String service;
	// 服务版本号
	private String version = "1.0.0";

	private PathChildrenCache cachedPath;

	private CuratorFramework zkClient;

	// 用来保存当前provider所接触过的地址记录
	// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
	private Set<String> trace = new HashSet<String>();

	private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();

	private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();

	private Object lock = new Object();

	// 默认权重
	private static final Integer DEFAULT_WEIGHT = 1;

	public void setService(String service) {
		this.service = service;
	}

	public void setVersion(String version) {
		this.version = version;
	}

	public ThriftServerAddressProviderZookeeper() {
	}

	public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		// 如果zk尚未启动,则启动
		if (zkClient.getState() == CuratorFrameworkState.LATENT) {
			zkClient.start();
		}
		buildPathChildrenCache(zkClient, getServicePath(), true);
		cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
	}

	private String getServicePath(){
		return "/" + service + "/" + version;
	}
	private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
		cachedPath = new PathChildrenCache(client, path, cacheData);
		cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				PathChildrenCacheEvent.Type eventType = event.getType();
				switch (eventType) {
				case CONNECTION_RECONNECTED:
					logger.info("Connection is reconection.");
					break;
				case CONNECTION_SUSPENDED:
					logger.info("Connection is suspended.");
					break;
				case CONNECTION_LOST:
					logger.warn("Connection error,waiting...");
					return;
				default:
					//
				}
				// 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
				cachedPath.rebuild();
				rebuild();
			}

			protected void rebuild() throws Exception {
				List<ChildData> children = cachedPath.getCurrentData();
				if (children == null || children.isEmpty()) {
					// 有可能所有的thrift server都与zookeeper断开了链接
					// 但是,有可能,thrift client与thrift server之间的网络是良好的
					// 因此此处是否需要清空container,是需要多方面考虑的.
					container.clear();
					logger.error("thrift server-cluster error....");
					return;
				}
				List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
				String path = null;
				for (ChildData data : children) {
					path = data.getPath();
					logger.debug("get path:"+path);
					path = path.substring(getServicePath().length()+1);
					logger.debug("get serviceAddress:"+path);
					String address = new String(path.getBytes(), "utf-8");
					current.addAll(transfer(address));
					trace.add(address);
				}
				Collections.shuffle(current);
				synchronized (lock) {
					container.clear();
					container.addAll(current);
					inner.clear();
					inner.addAll(current);

				}
			}
		});
	}

	private List<InetSocketAddress> transfer(String address) {
		String[] hostname = address.split(":");
		Integer weight = DEFAULT_WEIGHT;
		if (hostname.length == 3) {
			weight = Integer.valueOf(hostname[2]);
		}
		String ip = hostname[0];
		Integer port = Integer.valueOf(hostname[1]);
		List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
		// 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
		for (int i = 0; i < weight; i++) {
			result.add(new InetSocketAddress(ip, port));
		}
		return result;
	}

	@Override
	public List<InetSocketAddress> findServerAddressList() {
		return Collections.unmodifiableList(container);
	}

	@Override
	public synchronized InetSocketAddress selector() {
		if (inner.isEmpty()) {
			if (!container.isEmpty()) {
				inner.addAll(container);
			} else if (!trace.isEmpty()) {
				synchronized (lock) {
					for (String hostname : trace) {
						container.addAll(transfer(hostname));
					}
					Collections.shuffle(container);
					inner.addAll(container);
				}
			}
		}
		return inner.poll();
	}

	@Override
	public void close() {
		try {
            cachedPath.close();
            zkClient.close();
        } catch (Exception e) {
        }
	}

	@Override
	public String getService() {
		return service;
	}

}

对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java

三、服务端服务注册实现

ThriftServiceServerFactory.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc;
  2. import java.lang.instrument.IllegalClassFormatException;
  3. import java.lang.reflect.Constructor;
  4. import org.apache.thrift.TProcessor;
  5. import org.apache.thrift.TProcessorFactory;
  6. import org.apache.thrift.protocol.TBinaryProtocol;
  7. import org.apache.thrift.server.TServer;
  8. import org.apache.thrift.server.TThreadedSelectorServer;
  9. import org.apache.thrift.transport.TFramedTransport;
  10. import org.apache.thrift.transport.TNonblockingServerSocket;
  11. import org.springframework.beans.factory.InitializingBean;
  12. import org.springframework.util.StringUtils;
  13. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
  15. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;
  16. /**
  17. * 服务端注册服务工厂
  18. */
  19. public class ThriftServiceServerFactory implements InitializingBean {
  20. // 服务注册本机端口
  21. private Integer port = 8299;
  22. // 优先级
  23. private Integer weight = 1;// default
  24. // 服务实现类
  25. private Object service;// serice实现类
  26. //服务版本号
  27. private String version;
  28. // 解析本机IP
  29. private ThriftServerIpResolve thriftServerIpResolve;
  30. //服务注册
  31. private ThriftServerAddressRegister thriftServerAddressRegister;
  32. private ServerThread serverThread;
  33. public void setPort(Integer port) {
  34. this.port = port;
  35. }
  36. public void setWeight(Integer weight) {
  37. this.weight = weight;
  38. }
  39. public void setService(Object service) {
  40. this.service = service;
  41. }
  42. public void setVersion(String version) {
  43. this.version = version;
  44. }
  45. public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
  46. this.thriftServerIpResolve = thriftServerIpResolve;
  47. }
  48. public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
  49. this.thriftServerAddressRegister = thriftServerAddressRegister;
  50. }
  51. @Override
  52. public void afterPropertiesSet() throws Exception {
  53. if (thriftServerIpResolve == null) {
  54. thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
  55. }
  56. String serverIP = thriftServerIpResolve.getServerIp();
  57. if (StringUtils.isEmpty(serverIP)) {
  58. throw new ThriftException("cant find server ip...");
  59. }
  60. String hostname = serverIP + ":" + port + ":" + weight;
  61. Class<?> serviceClass = service.getClass();
  62. // 获取实现类接口
  63. Class<?>[] interfaces = serviceClass.getInterfaces();
  64. if (interfaces.length == 0) {
  65. throw new IllegalClassFormatException("service-class should implements Iface");
  66. }
  67. // reflect,load "Processor";
  68. TProcessor processor = null;
  69. String serviceName = null;
  70. for (Class<?> clazz : interfaces) {
  71. String cname = clazz.getSimpleName();
  72. if (!cname.equals("Iface")) {
  73. continue;
  74. }
  75. serviceName = clazz.getEnclosingClass().getName();
  76. String pname = serviceName + "$Processor";
  77. try {
  78. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  79. Class<?> pclass = classLoader.loadClass(pname);
  80. if (!TProcessor.class.isAssignableFrom(pclass)) {
  81. continue;
  82. }
  83. Constructor<?> constructor = pclass.getConstructor(clazz);
  84. processor = (TProcessor) constructor.newInstance(service);
  85. break;
  86. } catch (Exception e) {
  87. //
  88. }
  89. }
  90. if (processor == null) {
  91. throw new IllegalClassFormatException("service-class should implements Iface");
  92. }
  93. //需要单独的线程,因为serve方法是阻塞的.
  94. serverThread = new ServerThread(processor, port);
  95. serverThread.start();
  96. // 注册服务
  97. if (thriftServerAddressRegister != null) {
  98. thriftServerAddressRegister.register(serviceName, version, hostname);
  99. }
  100. }
  101. class ServerThread extends Thread {
  102. private TServer server;
  103. ServerThread(TProcessor processor, int port) throws Exception {
  104. TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
  105. TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
  106. TProcessorFactory processorFactory = new TProcessorFactory(processor);
  107. tArgs.processorFactory(processorFactory);
  108. tArgs.transportFactory(new TFramedTransport.Factory());
  109. tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
  110. server = new TThreadedSelectorServer(tArgs);
  111. }
  112. @Override
  113. public void run(){
  114. try{
  115. //启动服务
  116. server.serve();
  117. }catch(Exception e){
  118. //
  119. }
  120. }
  121. public void stopServer(){
  122. server.stop();
  123. }
  124. }
  125. public void close() {
  126. serverThread.stopServer();
  127. }
  128. }
package cn.slimsmart.thrift.rpc;

import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor;

import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.StringUtils;

import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;

/**
 * 服务端注册服务工厂
 */
public class ThriftServiceServerFactory implements InitializingBean {
	// 服务注册本机端口
	private Integer port = 8299;
	// 优先级
	private Integer weight = 1;// default
	// 服务实现类
	private Object service;// serice实现类
	//服务版本号
	private String version;
	// 解析本机IP
	private ThriftServerIpResolve thriftServerIpResolve;
	//服务注册
	private ThriftServerAddressRegister thriftServerAddressRegister;

	private ServerThread serverThread;

	public void setPort(Integer port) {
		this.port = port;
	}

	public void setWeight(Integer weight) {
		this.weight = weight;
	}

	public void setService(Object service) {
		this.service = service;
	}

	public void setVersion(String version) {
		this.version = version;
	}

	public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
		this.thriftServerIpResolve = thriftServerIpResolve;
	}

	public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
		this.thriftServerAddressRegister = thriftServerAddressRegister;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		if (thriftServerIpResolve == null) {
			thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
		}
		String serverIP = thriftServerIpResolve.getServerIp();
		if (StringUtils.isEmpty(serverIP)) {
			throw new ThriftException("cant find server ip...");
		}

		String hostname = serverIP + ":" + port + ":" + weight;
		Class<?> serviceClass = service.getClass();
		// 获取实现类接口
		Class<?>[] interfaces = serviceClass.getInterfaces();
		if (interfaces.length == 0) {
			throw new IllegalClassFormatException("service-class should implements Iface");
		}
		// reflect,load "Processor";
		TProcessor processor = null;
		String serviceName = null;
		for (Class<?> clazz : interfaces) {
			String cname = clazz.getSimpleName();
			if (!cname.equals("Iface")) {
				continue;
			}
			serviceName = clazz.getEnclosingClass().getName();
			String pname = serviceName + "$Processor";
			try {
				ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
				Class<?> pclass = classLoader.loadClass(pname);
				if (!TProcessor.class.isAssignableFrom(pclass)) {
					continue;
				}
				Constructor<?> constructor = pclass.getConstructor(clazz);
				processor = (TProcessor) constructor.newInstance(service);
				break;
			} catch (Exception e) {
				//
			}
		}
		if (processor == null) {
			throw new IllegalClassFormatException("service-class should implements Iface");
		}
		//需要单独的线程,因为serve方法是阻塞的.
		serverThread = new ServerThread(processor, port);
		serverThread.start();
		// 注册服务
		if (thriftServerAddressRegister != null) {
			thriftServerAddressRegister.register(serviceName, version, hostname);
		}

	}
	class ServerThread extends Thread {
		private TServer server;
		ServerThread(TProcessor processor, int port) throws Exception {
			TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
			TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
			TProcessorFactory processorFactory = new TProcessorFactory(processor);
			tArgs.processorFactory(processorFactory);
			tArgs.transportFactory(new TFramedTransport.Factory());
			tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
			server = new TThreadedSelectorServer(tArgs);
		}

		@Override
		public void run(){
			try{
				//启动服务
				server.serve();
			}catch(Exception e){
				//
			}
		}

		public void stopServer(){
			server.stop();
		}
	}

	public void close() {
		serverThread.stopServer();
	}
}

四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc;
  2. import java.net.InetSocketAddress;
  3. import org.apache.commons.pool.BasePoolableObjectFactory;
  4. import org.apache.thrift.TServiceClient;
  5. import org.apache.thrift.TServiceClientFactory;
  6. import org.apache.thrift.protocol.TBinaryProtocol;
  7. import org.apache.thrift.protocol.TProtocol;
  8. import org.apache.thrift.transport.TFramedTransport;
  9. import org.apache.thrift.transport.TSocket;
  10. import org.apache.thrift.transport.TTransport;
  11. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
  12. /**
  13. * 连接池,thrift-client for spring
  14. */
  15. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
  16. private final ThriftServerAddressProvider serverAddressProvider;
  17. private final TServiceClientFactory<TServiceClient> clientFactory;
  18. private PoolOperationCallBack callback;
  19. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
  20. this.serverAddressProvider = addressProvider;
  21. this.clientFactory = clientFactory;
  22. }
  23. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
  24. PoolOperationCallBack callback) throws Exception {
  25. this.serverAddressProvider = addressProvider;
  26. this.clientFactory = clientFactory;
  27. this.callback = callback;
  28. }
  29. static interface PoolOperationCallBack {
  30. // 销毁client之前执行
  31. void destroy(TServiceClient client);
  32. // 创建成功是执行
  33. void make(TServiceClient client);
  34. }
  35. public void destroyObject(TServiceClient client) throws Exception {
  36. if (callback != null) {
  37. try {
  38. callback.destroy(client);
  39. } catch (Exception e) {
  40. //
  41. }
  42. }
  43. TTransport pin = client.getInputProtocol().getTransport();
  44. pin.close();
  45. }
  46. public boolean validateObject(TServiceClient client) {
  47. TTransport pin = client.getInputProtocol().getTransport();
  48. return pin.isOpen();
  49. }
  50. @Override
  51. public TServiceClient makeObject() throws Exception {
  52. InetSocketAddress address = serverAddressProvider.selector();
  53. TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
  54. TTransport transport = new TFramedTransport(tsocket);
  55. TProtocol protocol = new TBinaryProtocol(transport);
  56. TServiceClient client = this.clientFactory.getClient(protocol);
  57. transport.open();
  58. if (callback != null) {
  59. try {
  60. callback.make(client);
  61. } catch (Exception e) {
  62. //
  63. }
  64. }
  65. return client;
  66. }
  67. }
package cn.slimsmart.thrift.rpc;

import java.net.InetSocketAddress;

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;

/**
 * 连接池,thrift-client for spring
 */
public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {

	private final ThriftServerAddressProvider serverAddressProvider;
	private final TServiceClientFactory<TServiceClient> clientFactory;
	private PoolOperationCallBack callback;

	protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
		this.serverAddressProvider = addressProvider;
		this.clientFactory = clientFactory;
	}

	protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
			PoolOperationCallBack callback) throws Exception {
		this.serverAddressProvider = addressProvider;
		this.clientFactory = clientFactory;
		this.callback = callback;
	}

	static interface PoolOperationCallBack {
		// 销毁client之前执行
		void destroy(TServiceClient client);

		// 创建成功是执行
		void make(TServiceClient client);
	}

	public void destroyObject(TServiceClient client) throws Exception {
		if (callback != null) {
			try {
				callback.destroy(client);
			} catch (Exception e) {
				//
			}
		}
		TTransport pin = client.getInputProtocol().getTransport();
		pin.close();
	}

	public boolean validateObject(TServiceClient client) {
		TTransport pin = client.getInputProtocol().getTransport();
		return pin.isOpen();
	}

	@Override
	public TServiceClient makeObject() throws Exception {
		InetSocketAddress address = serverAddressProvider.selector();
		TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
		TTransport transport = new TFramedTransport(tsocket);
		TProtocol protocol = new TBinaryProtocol(transport);
		TServiceClient client = this.clientFactory.getClient(protocol);
		transport.open();
		if (callback != null) {
			try {
				callback.make(client);
			} catch (Exception e) {
				//
			}
		}
		return client;
	}

}

客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

[java] view plain copy print?

  1. package cn.slimsmart.thrift.rpc;
  2. import java.lang.reflect.InvocationHandler;
  3. import java.lang.reflect.Method;
  4. import java.lang.reflect.Proxy;
  5. import org.apache.commons.pool.impl.GenericObjectPool;
  6. import org.apache.thrift.TServiceClient;
  7. import org.apache.thrift.TServiceClientFactory;
  8. import org.springframework.beans.factory.FactoryBean;
  9. import org.springframework.beans.factory.InitializingBean;
  10. import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
  11. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
  12. /**
  13. * 客户端代理
  14. */
  15. @SuppressWarnings({ "unchecked", "rawtypes" })
  16. public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
  17. private Integer maxActive = 32;// 最大活跃连接数
  18. // ms,default 3 min,链接空闲时间
  19. // -1,关闭空闲检测
  20. private Integer idleTime = 180000;
  21. private ThriftServerAddressProvider serverAddressProvider;
  22. private Object proxyClient;
  23. private Class<?> objectClass;
  24. private GenericObjectPool<TServiceClient> pool;
  25. private PoolOperationCallBack callback = new PoolOperationCallBack() {
  26. @Override
  27. public void make(TServiceClient client) {
  28. System.out.println("create");
  29. }
  30. @Override
  31. public void destroy(TServiceClient client) {
  32. System.out.println("destroy");
  33. }
  34. };
  35. public void setMaxActive(Integer maxActive) {
  36. this.maxActive = maxActive;
  37. }
  38. public void setIdleTime(Integer idleTime) {
  39. this.idleTime = idleTime;
  40. }
  41. public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
  42. this.serverAddressProvider = serverAddressProvider;
  43. }
  44. @Override
  45. public void afterPropertiesSet() throws Exception {
  46. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  47. // 加载Iface接口
  48. objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
  49. // 加载Client.Factory类
  50. Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
  51. TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
  52. ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
  53. GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
  54. poolConfig.maxActive = maxActive;
  55. poolConfig.minIdle = 0;
  56. poolConfig.minEvictableIdleTimeMillis = idleTime;
  57. poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
  58. pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
  59. proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
  60. @Override
  61. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  62. //
  63. TServiceClient client = pool.borrowObject();
  64. try {
  65. return method.invoke(client, args);
  66. } catch (Exception e) {
  67. throw e;
  68. } finally {
  69. pool.returnObject(client);
  70. }
  71. }
  72. });
  73. }
  74. @Override
  75. public Object getObject() throws Exception {
  76. return proxyClient;
  77. }
  78. @Override
  79. public Class<?> getObjectType() {
  80. return objectClass;
  81. }
  82. @Override
  83. public boolean isSingleton() {
  84. return true;
  85. }
  86. public void close() {
  87. if (serverAddressProvider != null) {
  88. serverAddressProvider.close();
  89. }
  90. }
  91. }
package cn.slimsmart.thrift.rpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;

import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;

/**
 * 客户端代理
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {

	private Integer maxActive = 32;// 最大活跃连接数

	// ms,default 3 min,链接空闲时间
	// -1,关闭空闲检测
	private Integer idleTime = 180000;
	private ThriftServerAddressProvider serverAddressProvider;

	private Object proxyClient;
	private Class<?> objectClass;

	private GenericObjectPool<TServiceClient> pool;

	private PoolOperationCallBack callback = new PoolOperationCallBack() {
		@Override
		public void make(TServiceClient client) {
			System.out.println("create");
		}

		@Override
		public void destroy(TServiceClient client) {
			System.out.println("destroy");
		}
	};

	public void setMaxActive(Integer maxActive) {
		this.maxActive = maxActive;
	}

	public void setIdleTime(Integer idleTime) {
		this.idleTime = idleTime;
	}

	public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
		this.serverAddressProvider = serverAddressProvider;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		// 加载Iface接口
		objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
		// 加载Client.Factory类
		Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
		TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
		ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
		GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
		poolConfig.maxActive = maxActive;
		poolConfig.minIdle = 0;
		poolConfig.minEvictableIdleTimeMillis = idleTime;
		poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
		pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
		proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				//
				TServiceClient client = pool.borrowObject();
				try {
					return method.invoke(client, args);
				} catch (Exception e) {
					throw e;
				} finally {
					pool.returnObject(client);
				}
			}
		});
	}

	@Override
	public Object getObject() throws Exception {
		return proxyClient;
	}

	@Override
	public Class<?> getObjectType() {
		return objectClass;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	public void close() {
		if (serverAddressProvider != null) {
			serverAddressProvider.close();
		}
	}
}

下面我们看一下服务端和客户端的配置;

服务端spring-context-thrift-server.xml

[html] view plain copy print?

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  6. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
  8. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
  9. default-lazy-init="false">
  10. <!-- zookeeper -->
  11. <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
  12. destroy-method="close">
  13. <property name="zkHosts"
  14. value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
  15. <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
  16. <property name="connectionTimeout" value="3000" />
  17. <property name="sessionTimeout" value="3000" />
  18. <property name="singleton" value="true" />
  19. </bean>
  20. <bean id="sericeAddressRegister"
  21. class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
  22. destroy-method="close">
  23. <property name="zkClient" ref="thriftZookeeper" />
  24. </bean>
  25. <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />
  26. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  27. destroy-method="close">
  28. <property name="service" ref="echoSerivceImpl" />
  29. <property name="port" value="9000" />
  30. <property name="version" value="1.0.0" />
  31. <property name="weight" value="1" />
  32. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  33. </bean>
  34. <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  35. destroy-method="close">
  36. <property name="service" ref="echoSerivceImpl" />
  37. <property name="port" value="9001" />
  38. <property name="version" value="1.0.0" />
  39. <property name="weight" value="1" />
  40. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  41. </bean>
  42. <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
  43. destroy-method="close">
  44. <property name="service" ref="echoSerivceImpl" />
  45. <property name="port" value="9002" />
  46. <property name="version" value="1.0.0" />
  47. <property name="weight" value="1" />
  48. <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
  49. </bean>
  50. </beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
				http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
				http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
				http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
	default-lazy-init="false">

	<!-- zookeeper -->
	<bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
		destroy-method="close">
		<property name="zkHosts"
			value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
		<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
		<property name="connectionTimeout" value="3000" />
		<property name="sessionTimeout" value="3000" />
		<property name="singleton" value="true" />
	</bean>
	<bean id="sericeAddressRegister"
		class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
		destroy-method="close">
		<property name="zkClient" ref="thriftZookeeper" />
	</bean>
	<bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />

	<bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
		destroy-method="close">
		<property name="service" ref="echoSerivceImpl" />
		<property name="port" value="9000" />
		<property name="version" value="1.0.0" />
		<property name="weight" value="1" />
		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
	</bean>

	<bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
		destroy-method="close">
		<property name="service" ref="echoSerivceImpl" />
		<property name="port" value="9001" />
		<property name="version" value="1.0.0" />
		<property name="weight" value="1" />
		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
	</bean>

	<bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
		destroy-method="close">
		<property name="service" ref="echoSerivceImpl" />
		<property name="port" value="9002" />
		<property name="version" value="1.0.0" />
		<property name="weight" value="1" />
		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
	</bean>
</beans>

客户端:spring-context-thrift-client.xml

[html] view plain copy print?

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  6. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
  8. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
  9. default-lazy-init="false">
  10. <!-- fixedAddress -->
  11. <!--
  12. <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
  13. <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
  14. <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
  15. </bean>
  16. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
  17. <property name="maxActive" value="5" />
  18. <property name="idleTime" value="10000" />
  19. <property name="serverAddressProvider" ref="fixedAddressProvider" />
  20. </bean>
  21. -->
  22. <!-- zookeeper   -->
  23. <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
  24. destroy-method="close">
  25. <property name="zkHosts"
  26. value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
  27. <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
  28. <property name="connectionTimeout" value="3000" />
  29. <property name="sessionTimeout" value="3000" />
  30. <property name="singleton" value="true" />
  31. </bean>
  32. <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
  33. <property name="maxActive" value="5" />
  34. <property name="idleTime" value="1800000" />
  35. <property name="serverAddressProvider">
  36. <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
  37. <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
  38. <property name="version" value="1.0.0" />
  39. <property name="zkClient" ref="thriftZookeeper" />
  40. </bean>
  41. </property>
  42. </bean>
  43. </beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
				http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
				http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
				http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
	default-lazy-init="false">

	<!-- fixedAddress -->
	<!--
	<bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
		 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
         <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
	</bean>
    <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
        <property name="maxActive" value="5" />
        <property name="idleTime" value="10000" />
        <property name="serverAddressProvider" ref="fixedAddressProvider" />
    </bean>
   -->
    <!-- zookeeper   -->
    <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
		destroy-method="close">
		<property name="zkHosts"
			value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
		<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
		<property name="connectionTimeout" value="3000" />
		<property name="sessionTimeout" value="3000" />
		<property name="singleton" value="true" />
	</bean>
    <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
        <property name="maxActive" value="5" />
        <property name="idleTime" value="1800000" />
        <property name="serverAddressProvider">
        	<bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
        		<property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
        		<property name="version" value="1.0.0" />
        		<property name="zkClient" ref="thriftZookeeper" />
        	</bean>
        </property>
    </bean>
</beans>

运行服务端后,我们可以看见zookeeper注册了多个服务地址。


详细实例这里就不详述了,请参考实例代码:https://github.com/slimina/thrift-zookeeper-rpc

关于Thrift设计优化文档:

Thrift RPC服务框架日志的优化

时间: 2024-10-13 21:10:38

基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化的相关文章

[转载] 基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化

转载自http://blog.csdn.net/zhu_tianwei/article/details/44115667 http://blog.csdn.net/column/details/slimina-thrift.html 对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行: 1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服

支持并发的http客户端(基于tcp连接池以及netty)

闲来无事,将以前自己写的一个库放出来吧.. 有的时候会有这样子的需求: (1)服务器A通过HTTP协议来访问服务器B (2)服务器A可能会并发的像B发送很多HTTP请求 类似于上述的需求,可能并不常见...因为在业务中确实遇到了这样子的场景,所以就自己动手开发了一个库... 实现原理: (1)底层IO通过netty搞 (2)维护一个tcp的长连接池,这样子就不用每次发送请求还要建立一个tcp连接了... 下面直接来看怎么用吧: (1)最常规的用法,向www.baidu.com发送100次get请

Thrift 个人实战--Thrift 服务化 Client的改造

前言: Thrift作为Facebook开源的RPC框架, 通过IDL中间语言, 并借助代码生成引擎生成各种主流语言的rpc框架服务端/客户端代码. 不过Thrift的实现, 简单使用离实际生产环境还是有一定距离, 本系列将对Thrift作代码解读和框架扩充, 使得它更加贴近生产环境. 本文主要讲解thrift的服务化改造, 这边侧重于阐述对client(服务调用方)的改造和设计思想. 基础概念: 传统对client的优化, 主要是Client Manager化, 优化方式包括引入连接池, 支持

基于连接池监控组件druid实现的监控用户在线状态

原文:基于连接池监控组件druid实现的监控用户在线状态 源代码下载地址:http://www.zuidaima.com/share/1550463574248448.htm 我也来分享个代码吧. 最近在做监控用户在线状态,处理客户端用户非正常意外退出(如直接关机,突然停电等)的情况遇到了点问题,找了好多方法都感觉不是很好. 昨天看到一个新的连接池管理工具感觉很不错~应该说是目前最好的---druid 这里不多做介绍,给大家个连接,想学习的可以去看看,开源的.http://code.alibab

HttpClient 基于连接池的使用

场景:调用外部系统接口的http请求 要求: 1:可能是http请求,也可能是https请求 2:需要加入连接池的概念,不能每次发起请求都新建一个连接(每次连接握手三次,效率太低) 准备使用httpclient 4.5的版本 HTTPClient的特性 1. 基于标准.纯净的Java语言.实现了Http1.0和Http1.1 2. 以可扩展的面向对象的结构实现了Http全部的方法(GET, POST, PUT, DELETE, HEAD, OPTIONS, and TRACE). 3. 支持HT

基于线程池和连接池的Http请求

背景:最新项目需求调用http接口,所以打算使用最新的httpClient客户端写一个工具类,写好了以后在实际应用过程中遇到了一些问题,因为数据量还算 大,每次处理大概要处理600-700次请求,平均算下来大概需要20分钟,这个速度虽然是跑在定时任务中的,但是也是不能忍受的,所以有了这个博客. 1.首先想到的解决办法就是多线程发请求了,但是这个有坑,最后会在结果处说明. 2.代码方面如下 ExecutorService executor = Executors.newFixedThreadPoo

通过dubbo暴露接口调用方法,及基于zookeeper的dubbo涉及配置文件【转】

现在很流行的Dubbo很多朋友都听说过吧,最近我也在看这方面的东西,分享先我的心得笔记. 先说说我们团队要做的项目框架,很简单重在实现基于zookeeper的dubbo注册. 框架:springmvc+spring+zookeeper+dubbo 项目分三层,model存放数据,view页面展示.controller下面具体逻辑实现.通过dubbo消费方和供应方注册,供应方给消费方暴露接口,供消费方调用. 工程部署需要配置文件有: applicationContext-dubbo.xml {--

【甘道夫】HBase连接池 -- HTablePool被Deprecated之后

说明: 最近两天在调研HBase的连接池,有了一些收获,特此记录下来. 本文先将官方文档(http://hbase.apache.org/book.html)9.3.1.1节翻译,方便大家阅读,然后查阅了关键类HConnectionManager的Developer API(http://hbase.apache.org/devapidocs/index.html) 做了一些总结. 最后介绍一些阅读0.96.0.98及最新源码的精彩发现. 欢迎转载,请注明来源: http://blog.csdn

ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用

0:说明 ActiveMQ 5.9.0新推出的主从实现,基于zookeeper来选举出一个master,其他节点自动作为slave实时同步消息.因为有实时同步数据的slave的存在,master不用担心数据丢失,所以leveldb会优先采用内存存储消息,异步同步到磁盘,所以该方式的activeMQ读写性能最好因为选举机制要超过半数,所以最少需要3台节点,才能实现高可用.如果集群是两台则master失效后slave会不起作用,所以集群至少三台.此种方式仅实现主备功能,避免单点故障,没有负载均衡功能