rpc框架之 thrift连接池实现

接前一篇rpc框架之HA/负载均衡构架设计 继续,写了一个简单的thrift 连接池:

先做点准备工作:

package yjmyzz;

public class ServerInfo {

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    private String host;
    private int port;

    public ServerInfo(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public String toString() {
        return "host:" + host + ",port:" + port;
    }
}

上面这个类,用来封装服务端的基本信息,主机名+端口号,连接时需要用到。

package yjmyzz;

import org.apache.thrift.transport.TTransport;

import java.text.SimpleDateFormat;
import java.util.Date;

public class TransfortWrapper {

    private TTransport transport;

    /**
     * 是否正忙
     */
    private boolean isBusy = false;

    /**
     * 是否已经挂
     */
    private boolean isDead = false;

    /**
     * 最后使用时间
     */
    private Date lastUseTime;

    /**
     * 服务端Server主机名或IP
     */
    private String host;

    /**
     * 服务端Port
     */
    private int port;

    public TransfortWrapper(TTransport transport, String host, int port, boolean isOpen) {
        this.lastUseTime = new Date();
        this.transport = transport;
        this.host = host;
        this.port = port;
        if (isOpen) {
            try {
                transport.open();
            } catch (Exception e) {
                //e.printStackTrace();
                System.err.println(host + ":" + port + " " + e.getMessage());
                isDead = true;
            }
        }
    }

    public TransfortWrapper(TTransport transport, String host, int port) {
        this(transport, host, port, false);
    }

    public boolean isBusy() {
        return isBusy;
    }

    public void setIsBusy(boolean isBusy) {
        this.isBusy = isBusy;
    }

    public boolean isDead() {
        return isDead;
    }

    public void setIsDead(boolean isDead) {
        this.isDead = isDead;
    }

    public TTransport getTransport() {
        return transport;
    }

    public void setTransport(TTransport transport) {
        this.transport = transport;
    }

    /**
     * 当前transport是否可用
     *
     * @return
     */
    public boolean isAvailable() {
        return !isBusy && !isDead && transport.isOpen();
    }

    public Date getLastUseTime() {
        return lastUseTime;
    }

    public void setLastUseTime(Date lastUseTime) {
        this.lastUseTime = lastUseTime;
    }

    public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    public String toString() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return "hashCode:" + hashCode() + "," +
                host + ":" + port + ",isBusy:" + isBusy + ",isDead:" + isDead + ",isOpen:" +
                transport.isOpen() + ",isAvailable:" + isAvailable() + ",lastUseTime:" + format.format(lastUseTime);
    }
}

这是对TTransport的封装,主要增加了一些辅助信息,直接看代码注释即可。

下面才是连接池的主要内容:

package yjmyzz;

import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.util.Date;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Thrift连接池
 *
 * @author : 菩提树下的杨过(http://yjmyzz.cnblogs.com/)
 * @version : 0.1 BETA
 * @since : 2015-09-27(中秋)
 */
public class ThriftTransportPool {

    Semaphore access = null;
    TransfortWrapper[] pool = null;
    int poolSize = 1;//连接池大小
    int minSize = 1;//池中保持激活状态的最少连接个数
    int maxIdleSecond = 300;//最大空闲时间(秒),超过该时间的空闲时间的连接将被关闭
    int checkInvervalSecond = 60;//每隔多少秒,检测一次空闲连接(默认60秒)
    List<ServerInfo> serverInfos;
    boolean allowCheck = true;
    Thread checkTread = null;

    public int getCheckInvervalSecond() {
        return checkInvervalSecond;
    }

    public void setCheckInvervalSecond(int checkInvervalSecond) {
        this.checkInvervalSecond = checkInvervalSecond;
    }

    /**
     * 连接池构造函数
     *
     * @param poolSize            连接池大小
     * @param minSize             池中保持激活的最少连接数
     * @param maxIdleSecond       单个连接最大空闲时间,超过此值的连接将被断开
     * @param checkInvervalSecond 每隔多少秒检查一次空闲连接
     * @param serverList          服务器列表
     */
    public ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, List<ServerInfo> serverList) {
        if (poolSize <= 0) {
            poolSize = 1;
        }
        if (minSize > poolSize) {
            minSize = poolSize;
        }
        if (minSize <= 0) {
            minSize = 0;
        }
        this.maxIdleSecond = maxIdleSecond;
        this.minSize = minSize;
        this.poolSize = poolSize;
        this.serverInfos = serverList;
        this.allowCheck = true;
        this.checkInvervalSecond = checkInvervalSecond;
        init();
        check();
    }

    /**
     * 连接池构造函数(默认最大空闲时间300秒)
     *
     * @param poolSize   连接池大小
     * @param minSize    池中保持激活的最少连接数
     * @param serverList 服务器列表
     */
    public ThriftTransportPool(int poolSize, int minSize, List<ServerInfo> serverList) {
        this(poolSize, minSize, 300, 60, serverList);
    }

    public ThriftTransportPool(int poolSize, List<ServerInfo> serverList) {
        this(poolSize, 1, 300, 60, serverList);
    }

    public ThriftTransportPool(List<ServerInfo> serverList) {
        this(serverList.size(), 1, 300, 60, serverList);
    }

    /**
     * 检查空闲连接
     */
    private void check() {
        checkTread =
                new Thread(new Runnable() {
                    public void run() {
                        while (allowCheck) {
                            //System.out.println("--------------");
                            System.out.println("开始检测空闲连接...");
                            for (int i = 0; i < pool.length; i++) {
                                //if (pool[i] == null) {
                                //    System.out.println("pool[" + i + "]为null");
                                //}
                                //if (pool[i].getTransport() == null) {
                                //    System.out.println("pool[" + i + "].getTransport()为null");
                                //}
                                if (pool[i].isAvailable() && pool[i].getLastUseTime() != null) {
                                    long idleTime = new Date().getTime() - pool[i].getLastUseTime().getTime();
                                    //超过空闲阀值的连接,主动断开,以减少资源消耗
                                    if (idleTime > maxIdleSecond * 1000) {
                                        if (getActiveCount() > minSize) {
                                            pool[i].getTransport().close();
                                            pool[i].setIsBusy(false);
                                            System.out.println(pool[i].hashCode() + "," + pool[i].getHost() + ":" + pool[i].getPort() + " 超过空闲时间阀值被断开!");
                                        }
                                    }
                                }
                            }
                            System.out.println("当前活动连接数:" + getActiveCount());
                            try {
                                Thread.sleep(checkInvervalSecond * 1000);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
        checkTread.start();
    }

    /**
     * 连接池初始化
     */
    private void init() {
        access = new Semaphore(poolSize);
        pool = new TransfortWrapper[poolSize];

        for (int i = 0; i < pool.length; i++) {
            int j = i % serverInfos.size();
            TSocket socket = new TSocket(serverInfos.get(j).getHost(),
                    serverInfos.get(j).getPort());
            if (i < minSize) {
                pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort(), true);
            } else {
                pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort());
            }
        }
    }

    /**
     * 从池中取一个可用连接
     * @return
     */
    public TTransport get() {
        try {
            if (access.tryAcquire(3, TimeUnit.SECONDS)) {
                synchronized (this) {
                    for (int i = 0; i < pool.length; i++) {
                        if (pool[i].isAvailable()) {
                            pool[i].setIsBusy(true);
                            pool[i].setLastUseTime(new Date());
                            return pool[i].getTransport();
                        }
                    }
                    //尝试激活更多连接
                    for (int i = 0; i < pool.length; i++) {
                        if (!pool[i].isBusy() && !pool[i].isDead()
                                && !pool[i].getTransport().isOpen()) {
                            try {
                                pool[i].getTransport().open();
                                pool[i].setIsBusy(true);
                                pool[i].setLastUseTime(new Date());
                                return pool[i].getTransport();
                            } catch (Exception e) {
                                //e.printStackTrace();
                                System.err.println(pool[i].getHost() + ":" + pool[i].getPort() + " " + e.getMessage());
                                pool[i].setIsDead(true);
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("can not get available client");

        }
        throw new RuntimeException("all client is too busy");
    }

    /**
     * 客户端调用完成后,必须手动调用此方法,将TTransport恢复为可用状态
     *
     * @param client
     */
    public void release(TTransport client) {
        boolean released = false;
        synchronized (this) {
            for (int i = 0; i < pool.length; i++) {
                if (client == pool[i].getTransport() && pool[i].isBusy()) {
                    pool[i].setIsBusy(false);
                    released = true;
                    break;
                }
            }
        }
        if (released) {
            access.release();
        }
    }

    public void destory() {
        if (pool != null) {
            for (int i = 0; i < pool.length; i++) {
                pool[i].getTransport().close();
            }
        }
        allowCheck = false;
        checkTread = null;
        System.out.print("连接池被销毁!");
    }

    /**
     * 获取当前已经激活的连接数
     *
     * @return
     */
    public int getActiveCount() {
        int result = 0;
        for (int i = 0; i < pool.length; i++) {
            if (!pool[i].isDead() && pool[i].getTransport().isOpen()) {
                result += 1;
            }
        }
        return result;
    }

    /**
     * 获取当前繁忙状态的连接数
     *
     * @return
     */
    public int getBusyCount() {
        int result = 0;
        for (int i = 0; i < pool.length; i++) {
            if (!pool[i].isDead() && pool[i].isBusy()) {
                result += 1;
            }
        }
        return result;
    }

    /**
     * 获取当前已"挂"掉连接数
     *
     * @return
     */
    public int getDeadCount() {
        int result = 0;
        for (int i = 0; i < pool.length; i++) {
            if (pool[i].isDead()) {
                result += 1;
            }
        }
        return result;
    }

    public String toString() {
        return "poolsize:" + pool.length +
                ",minSize:" + minSize +
                ",maxIdleSecond:" + maxIdleSecond +
                ",checkInvervalSecond:" + checkInvervalSecond +
                ",active:" + getActiveCount() +
                ",busy:" + getBusyCount() +
                ",dead:" + getDeadCount();
    }

    public String getWrapperInfo(TTransport client) {
        for (int i = 0; i < pool.length; i++) {
            if (pool[i].getTransport() == client) {
                return pool[i].toString();
            }
        }
        return "";
    }
}

主要思路:

1.构造器里,传入 连接池大小,最小连接数,连接最大空闲时间,空间连接检测时间间隔,服务端列表等基本信息

2.然后调用init方法进行初始化,初始化时把pool[]数组填满,不过在填充的时候,要根据minsize决定激活多少连接(换句话讲,连接实例都都建好了,只是连不连的问题),另外初始化的时候,还要考虑到某个服务器宕机的可能,如果服务端挂了,将对应的实例设置为isDead=true的状态

3.新开一个线程定时检查是否有空闲连接,如果空闲时间太长,主动断开,以节省开销。

4.get()方法从数组中捞一个可用的连接出来,取的时候要考虑到唤醒"沉睡"连接的情况,即如果当前池中只有2个活动连接,这时又来了请求,没有活动连接了,要从池中把断开的连接叫醒一个。

5.要控制并发控制,多个线程同时调用get()想从池中取可用连接时,可用Semaphore+Lock的机制来加以控制,可参考上一篇内容。

测试:

package yjmyzz;

import org.apache.thrift.transport.TSocket;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class PoolTest {

    public static void main(String[] args) throws Exception {

        //初始化一个连接池(poolsize=15,minsize=1,maxIdleSecond=5,checkInvervalSecond=10)
        final ThriftTransportPool pool = new ThriftTransportPool(15, 1, 5, 10, getServers());

        //模拟客户端调用
        createClients(pool);

        //等候清理空闲连接
        Thread.sleep(30000);

        //再模拟一批客户端,验证连接是否会重新增加
        createClients(pool);

        System.out.println("输入任意键退出...");
        System.in.read();
        //销毁连接池
        pool.destory();

    }

    private static void createClients(final ThriftTransportPool pool) throws Exception {

        //模拟5个client端
        int clientCount = 5;

        Thread thread[] = new Thread[clientCount];
        FutureTask<String> task[] = new FutureTask[clientCount];

        for (int i = 0; i < clientCount; i++) {
            task[i] = new FutureTask<String>(new Callable<String>() {
                public String call() throws Exception {
                    TSocket scoket = (TSocket) pool.get();//从池中取一个可用连接
                    //模拟调用RPC会持续一段时间
                    System.out.println(Thread.currentThread().getName() + " => " + pool.getWrapperInfo(scoket));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    pool.release(scoket);//记得每次用完,要将连接释放(恢复可用状态)
                    return Thread.currentThread().getName() + " done.";
                }
            });
            thread[i] = new Thread(task[i], "Thread" + i);
        }

        //启用所有client线程
        for (int i = 0; i < clientCount; i++) {
            thread[i].start();
            Thread.sleep(10);
        }

        System.out.println("--------------");

        //等待所有client调用完成
        for (int i = 0; i < clientCount; i++) {
            System.out.println(task[i].get());
            System.out.println(pool);
            System.out.println("--------------");
            thread[i] = null;
        }
    }

    private static List<ServerInfo> getServers() {
        List<ServerInfo> servers = new ArrayList<ServerInfo>();
        servers.add(new ServerInfo("localhost", 9000));
        servers.add(new ServerInfo("localhost", 9001));
        servers.add(new ServerInfo("localhost", 1002));//这一个故意写错的,模拟服务器挂了,连接不上的情景
        return servers;
    }
}

输出:

****************************
开始检测空闲连接...
当前活动连接数:1
Thread1 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
Thread0 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
localhost:1002 java.net.ConnectException: Connection refused
Thread2 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
Thread3 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
localhost:1002 java.net.ConnectException: Connection refused
Thread4 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
开始检测空闲连接...
1510835162,localhost:9000 超过空闲时间阀值被断开!
919192718,localhost:9001 超过空闲时间阀值被断开!
1466719669,localhost:9000 超过空闲时间阀值被断开!
2080503518,localhost:9001 超过空闲时间阀值被断开!
当前活动连接数:1
开始检测空闲连接...
当前活动连接数:1
开始检测空闲连接...
当前活动连接数:1
Thread0 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread1 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread2 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread3 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread4 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:4,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
输入任意键退出...
q
连接池被销毁!

***********************

注意上面高亮颜色的部分,2080503518 连接创建后,后来被check方法主动检测到空闲断开,然后第二轮调用时,又重新激活。411724643 则幸免于难,一直战斗到最后。另外由于故意写错了一个server地址,池中始终有二个dead的实例。

值得改进的地方:

主要是公平性的问题,在初始化的时候,如果服务器有3台,而指定的连接池大小为4,目前的做法是,用4对3取模,所以第1、4个连接实例都是连接到服务器1,get取可用连接的时候也有类似情况,是按pool数组从前向后遍历的,捞到第1个可用的连接就完事了,这样永远是排在List前面的服务器压力会大一些,这样有点不太符合负载"均衡"的本意。

不过,这个问题也很好解决,有一个很简单有效的技巧,实际应用中,服务器列表是从zk上取回来的,取回来后,先对数组做随机排序,这样整体看来下,多个连接池总体的连接分布情况就比较平均了。

时间: 2024-10-11 18:07:13

rpc框架之 thrift连接池实现的相关文章

rpc框架之thrift教程1 - 安装 及 hello world

thrift是一个facebook开源的高效RPC框架,其主要特点是跨语言及二进制高效传输(当然,除了二进制,也支持json等常用序列化机制),官网地址:http://thrift.apache.org 跨语言通常有二种做法, 一是将其它语言转换成某种主流的通用语言,比如:delphi.net以前就是先将delphi转换成c#,然后再编译成IL,从而实现delphi在.net上的运行(好久没关注delphi了,不知道现在还是不是这种机制) 二是先定义一种规范文件(可以简单的理解为『母版』),然后

Spring Boot (三): ORM 框架 JPA 与连接池 Hikari

前面两篇文章我们介绍了如何快速创建一个 Spring Boot 工程<Spring Boot(一):快速开始>和在 Spring Boot 中如何使用模版引擎 Thymeleaf 渲染一个Web页面<Spring Boot (二):模版引擎 Thymeleaf 渲染 Web 页面>,本篇文章我们继续介绍在 Spring Boot 中如何使用数据库. 1. 概述 数据库方面我们选用 Mysql , Spring Boot 提供了直接使用 JDBC 的方式连接数据库,毕竟使用 JDBC

RPC框架之Thrift

目前流行的服务调用方式有很多种,例如基于SOAP消息格式的 Web Service,基于 JSON 消息格式的 RESTful 服务等.其中所用到的数据传输方式包括 XML,JSON 等,然而 XML 相对体积太大,传输效率低,JSON 体积较小,新颖,但还不够完善. 本文将介绍由 Facebook 开发的远程服务调用框架 Apache Thrift,它采用接口描述语言定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java, Python, PH

RPC框架之Thrift分析(转)

一.简介 1.Thrift是Facebook开发的跨语言的RPC服务框架.随后贡献给Apache开源组织.成为RPC服务的主流框架. 2.特点: 优点: 跨语言,支持java.c/c++.python等多种编程语言 IDL定义接口函数和数据类型 支持二进制传输,效率高 支持多种工作模型,单线程模型.线程池模型.非阻塞模型 缺点: 文档不多 各版本不兼容,升级不方便 二.分析 Thrift分为服务端(server)和客户端(Client)两个对应的部分.代码分层设计,分为Transport(传输层

代码生成器+shiro安全框架+SpringMVC+mybatis+连接池druid+HTML5

代码有持续更新(提供全部源码) 新增:1.代码生成器,将大大提高开发效率,增删改查的处理类,service层,mybatis的xml,SQL 脚本,   jsp页面 等重复低级的代码 将瞬间生成,从此不再当码农 2.数据库连接池  (阿里的 druid.Druid在监控.可扩展性.稳定性和性能方面都有明显的优势.) 3.加入安全框架 shiro  说明:SpringMVC+mybatis+shiro(oracle 和 mysql) HTML5 全新高大尚后台框架 bootstrap(可换皮肤)

Spring框架中获取连接池常用的四种方式

1:DBCP数据源 DBCP类包位于 /lib/jakarta-commons/commons-dbcp.jar,DBCP是一个依赖Jakarta commons-pool对象池机制的数据库连接池,所以在类路径下还必须包括/lib/jakarta-commons/commons-pool.jar.下面是使用DBCP配置oracle数据源的配置片断: <bean id="dataSource" class="org.apache.commons.dbcp.BasicDat

rpc框架之thrift教程2 - 基本概念

thrift的基本构架: 上图源自:http://jnb.ociweb.com/jnb/jnbJun2009.html 底层Underlying I/O以上的部分,都是由thrift编译器生成的代码,其中: Your Code 这是根据thrift文件中定义的dto及service接口方法 FooService.Client及FooService.Processer是thrift生成的用于客户端及服务端的标准代码 Foo.read/write 参数对象及结果对象在传输时,最终需要在client.

Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架

本文首先介绍了什么是Apache Thrift,接着介绍了Thrift的安装部署及如何利用Thrift来实现一个简单的RPC应用,并简单的探究了一下Thrift的内部实现原理,最后给出一个基于Thrift的可扩展的分布式RPC调用框架,在中小型项目中是一个常见的SOA实践. Thrift介绍 Apache Thrift是Facebook 开发的远程服务调用框架,它采用接口描述语言(IDL)定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java,

微博轻量级RPC框架Motan正式开源:支撑千亿调用

支撑微博千亿调用的轻量级 RPC 框架 Motan 正式开源了,项目地址为https://github.com/weibocom/motan. 微博轻量级RPC框架Motan正式开源 Motan 是微博技术团队研发的基于 Java 的轻量级 RPC 框架,已在微博内部大规模应用多年,每天稳定支撑微博上亿次的内部调用.Motan 基于微博的高并发和高负载场景优化,成为一套简单.易用.高可用的 RPC 服务框架. Motan 功能特点:简单.易用.高可用 无侵入集成.简单易用,通过 Spring 配