Thrift Server总结

Server Server 框架流程:

(1)首先获取连接

  client = serverTransport_->accept();

(2)然后获取thransport和protocol

  inputTransport = inputTransportFactory_->getTransport(client);

  outputTransport = outputTransportFactory_->getTransport(client);

  inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);

  outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);

(3)接着获取processor

  shared_ptr<TProcessor> processor = getProcessor(inputProtocol,outputProtocol, client);

(4)最后处理请求

  processor->process(inputProtocol, outputProtocol,connectionContext);

TThreadedServer是多线程处理请求,来一个连接请求就开启一个线程处理它,处理完就退出线程。处理线程有2种类型:PthreadThreadBoostThread,前者是基posix函数,后者基于boost线程类,它们均通过对应的工厂类创建;还有个Task类,继承于Runnable,主要的处理请求流程在此实现。

TNonblockingServer

(1)采用libevent库的事件驱动框架

// Register the server event
  event_set(&serverEvent_,
            serverSocket_,
            EV_READ | EV_PERSIST,
            TNonblockingServer::eventHandler,
            this);

event_base_set(eventBase_, &serverEvent_);

event_add(&serverEvent_, 0);

  // Create an event to be notified when a task finishes

  event_set(&notificationEvent_,
            getNotificationRecvFD(),
            EV_READ | EV_PERSIST,
            TConnection::taskHandler,
            this);
  event_base_set(eventBase_, &notificationEvent_);
  event_add(&notificationEvent_, 0);

当为threadPoolProcessing mode时,主线程+工作线程架构,主线程负责接收新连接和监听网络数据,当有数据到达时,生成一个task放入队列,工作线程负责取出task,并执行task。这里还用到unix socket来通知主线程任务已完成和使用TMemoryBuffer来缓存读写数据。

connectionStack_是保存空闲TConnection对象的栈,有新连接就从栈里取出一个TConnection对象并初始化它,栈为空时才new一个TConnection对象。连接被close掉时会将TConnection对象交还给connectionStack_,即入栈,但connectionStack_的大小达到connectionStackLimit_时delete掉该TConnection对象。

当numActiveProcessors_ > maxActiveProcessors_ 或 activeConnections > maxConnections_时,server达到overloaded状态,其中numActiveProcessors_是指目前尚未处理的task数;

(5)server达到overloaded状态时,可能采取以下三种措施:
/// Overload condition actions.
enum TOverloadAction {
  T_OVERLOAD_NO_ACTION,        ///< Don‘t handle overload */
  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
  T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop one tasks from head of task queue */
};

有新连接被accept时,会先检查是否达到overloaded,但是会进行如下处理Overload的代码:
// If we‘re overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
  nConnectionsDropped_++;
  nTotalConnectionsDropped_++;
  if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
    close(clientSocket);
    return;
  } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
    if (!drainPendingTask()) {
    // Nothing left to discard, so we drop connection instead.
    close(clientSocket);
    return;
    }
  }
}

drainPendingTask()函数:
bool TNonblockingServer::drainPendingTask() {
  if (threadManager_) {
    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
    if (task) {
      TConnection* connection =
        static_cast<TConnection::Task*>(task.get())->getTConnection();
      assert(connection && connection->getServer()
             && connection->getState() == APP_WAIT_TASK);
      connection->forceClose();
      return true;
    }
  }
  return false;

}

} //可见,只drop one task

时间: 2024-07-29 10:23:56

Thrift Server总结的相关文章

spark sql thrift server

### create data ## cat /dev/urandom | head -1 | md5sum | head -c 8 ## echo "$(date +%s)"|sha256sum|base64|head -c 16;echo ## cat /dev/urandom | awk 'NR==1{print $0|"md5sum|base64|grep -Eo '^.{16}'";exit}' for i in {1..100000} do passwd

hive12启动报错org.apache.thrift.server.TThreadPoolServer.&lt;init&gt;(Lorg/apache/thrift/server/TThreadPoolServer$Args;)

执行如下命令启动hive服务:./bin/hive --service hiveserver,报如下错误: Starting Hive Thrift ServerException in thread "main" java.lang.NoSuchMethodError: org.apache.thrift.server.TThreadPoolServer.<init>(Lorg/apache/thrift/server/TThreadPoolServer$Args;)V 

创建Thrift Server和Thrift Client

1.创建Server package cn.horace.thrift.server; import cn.horace.thrift.idl.IUserService; import cn.horace.thrift.rpc.IUserServiceImpl; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apac

「Spark」Spark SQL Thrift Server运行方式

Spark SQL可以使用JDBC/ODBC或命令行接口充当分布式查询引擎.这种模式,用户或者应用程序可以直接与Spark SQL交互,以运行SQL查询,无需编写任何代码. Spark SQL提供两种方式来运行SQL: 通过运行Thrift Server 直接执行Spark SQL命令行 运行Thrift Server方式 1.先运行Hive metastore nohup hive --service metastore & 2.在 hdfs-site.xml 中添加以下配置 <prope

spark 启动thrift server 支持 jdbc连接

在 ./conf下 创建 hive-site.xml 添加: <configuration> <property> <name>hive.metastore.client.connect.retry.delay</name> <value>5</value> </property> <property> <name>hive.metastore.client.socket.timeout</n

SparkSQL使用之Thrift JDBC server

Thrift JDBC Server描述 Thrift JDBC Server使用的是HIVE0.12的HiveServer2实现.能够使用Spark或者hive0.12版本的beeline脚本与JDBC Server进行交互使用.Thrift JDBC Server默认监听端口是10000. 使用Thrift JDBC Server前需要注意: 1.将hive-site.xml配置文件拷贝到$SPARK_HOME/conf目录下: 2.需要在$SPARK_HOME/conf/spark-env

Thrift源码学习二——Server层

Thrift 提供了如图五种模式:TSimpleServer.TNonblockingServer.THsHaServer.TThreadPoolServer.TThreadSelectorServer ?? TSimpleServer.TThreadPoolServer 属于阻塞模型 TNonblockingServer.THsHaServer.TThreadedSelectorServer 属于非阻塞模型 TServer TServer 为抽象类 public static class Ar

Thrift使用实例

首先下载thrift.exe,和对应lib包.注意版本一定要一致. 否则编译会不识别出现错误. 可能会出现org.slf4j这个错误,那么你要把slf4j-api.jar下载下来引入到你的project中 namespace java com.nerd.thrift.service /** * */ service sayThriftService{ void say(); } 通过在命令行中转到 thrift-1.8.0.exe -gen java  sayThriftService 在磁盘目

Thrift实现C#调用Java开发步骤详解

转载请注明出处:jiq?钦's technical Blog Apache Thrift 是 Facebook 实现的一种高效的.支持多种编程语言的远程服务调用的框架. 类似的跨语言RPC框架还有ICE.Hessian.Protocol Buffer.Avro等. 1 下载Thrift 下载地址:http://thrift.apache.org/download thrift-0.9.3.exe         用于编译Thrift中间文件生成对应语言代码的工具 thrift-0.9.3.tar