tars framework 源码解读(三) servant部分章节。客户端部分。完整的tars调用流程详解

一般tars客户端使用方式:

我们用客户端进行tars rpc调用时候,一般如下面这样写:

方式一、
//直连方式
TC_Endpoint ep;

AdminFPrx pAdminPrx; //服务管理代理

string sAdminPrx = "[email protected]"+_serverObjectPtr->getLocalEndpoint().toString();

pAdminPrx = Application::getCommunicator()->stringToProxy<AdminFPrx>(sAdminPrx);

sResult = pAdminPrx->notify(_msg);

方式二、

PatchPrx proxy = Application::getCommunicator()->stringToProxy<PatchPrx>(_patchRequest.patchobj);

proxy->tars_timeout(60000);

方式三、

NotifyPrx pNotifyPrx = Application::getCommunicator()->stringToProxy<NotifyPrx>(ServerConfig::Notify);

if (pNotifyPrx && sResult != "")

{

  pNotifyPrx->async_reportServer(NULL, sServerId, "", sResult);

}

这里只是列举了几种,还有更多的写法。其实都是分成两步:

1由Application::getCommunicator()->stringToProxy(strObjName)得到一个proxyPtr。

2再用proxyPtr,同步,或者异步等各种rpc调用对应的方法.

其实上面的调用,也等同于:

"[email protected] -h ip1 -t 60000 -p port1;tcp -h ip2 -t 60000 -p port1"

可以根据servantname去直连对应的机器

那么核心流程就是stringToProxy和proxyPtr里面的实现。

获取Proxy的实现

stringToProxy的伪代码实现是:

stringToProxy(const string& objectName,const string& setName="")
{
  return ServantProxyFactory->getServantProxy(objectName,setName);
}

getServantProxy的实现伪代码是:
getServantProxy(const string& name,const string& setName)
{
if _servantProxy.find(name+":"+setName) return findvalue;
ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
{
  ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
}
ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());

//设置同步调用超时3ss。异步调用超时5s。连接超时1.5s
sp->tars_timeout(_comm->getProperty("sync-invoke-timeout", "3000"));
sp->tars_async_timeout(_comm->getProperty("async-invoke-timeout", "5000"));
sp->tars_connect_timeout(_comm->getProperty("connect-timeout", "1500"));
_servantProxy[name+":"+setName]=sp;
return sp;
}

上面代码说白了就是建立一个ServantProxy.并且他上面会带getClientThreadNum个数的ObjectProxy.一个ObjectProxy对应一个线程。然后缓存起来。

那么就有个问题getClientThreadNum和sync-invoke-timeout这些东东是配在哪的呢?

其实是在#define CONFIG_ROOT_PATH "/tars/application/client" 下面的对应内容.

getClientThreadNum值是netthread字段。默认是1。也就是基本上大部分tars服务一般就一个客户端线程.

ServantProxy部分内容查看 《ServantProxy部分模块》

rpc调用的实现

每个.tars文件最终都会被tars parser编译生成一个.h及.hpp文件,单就interface部分而已,会生成带对应interfacename字符的一些类:

客户端用:

PrxCallback类(普通异步回调处理类),

PrxCallbackPromise类(promise异步回调处理类),

CoroPrxCallback类(协程异步回调处理类),

客户端Proxy实现类。上面3个类的处理流程都会在这个类的异步调用中用到

服务端用:

Servant类。包括 下面几个部分:

目标接口的纯虚函数。

以及目标接口的被调用回复结果async_response函数(里面自动生成代码 实现了对响应结果进行统计)。

onDispatch函数,此函数自动生成代码,实现了对发送过来的请求,根据请求TarsCurrentPtr中的接口名找到对应的函数(用的是一个vector来 switch case 函数在接口组中的序号,加快匹配速度);从TarsCurrentPtr中解包出对应结构体;执行刚才找到的函数,并将执行结果返回给结果参数

这几块类的具体实现的内容,参考下面AdminReg部分的介绍

比如AdminReg.tars中的interface AdminReg生成有:

//这3个是异步回调处理类

class AdminRegPrxCallback: public tars::ServantProxyCallback;

class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;

class AdminRegCoroPrxCallback: public AdminRegPrxCallback;

//下面是主要功能实现类

class AdminRegProxy : public tars::ServantProxy; 客户端类

class AdminReg : public tars::Servant; 服务端类

这一大坨代码很长很长。都是.tars文件自动生成的一套通用模板类。里面的具体实现介绍如下:

class AdminRegProxy : public tars::ServantProxy;类

给每个接口都实现了如下模式代码,详解看下面注释:

typedef map<string, string> TARS_CONTEXT;
//同步调用接口.
tars::Int32 addTaskReq(const tars::TaskReq & taskReq,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
{
//将请求参数打包到TarsOutputStream<tars::BufferWriter>,其实是序列化
tars::TarsOutputStream<tars::BufferWriter> _os;

_os.write(taskReq, 1);
tars::ResponsePacket rep;
std::map<string, string> _mStatus;
//调用tars_invoke。将对应函数名和请求,返回等参数传入
tars_invoke(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, rep);
//返回请求context结果..具体作用后面再分析
if(pResponseContext)
{
*pResponseContext = rep.context;
}
//将返回结果从TarsOutputStream<tars::BufferWriter>反序列化出来
tars::TarsInputStream<tars::BufferReader> _is;
_is.setBuffer(rep.sBuffer);
tars::Int32 _ret;
_is.read(_ret, 0, true);
//返回调用返回值
return _ret;
}
//异步调用接口.
void async_addTaskReq(AdminRegPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT())
{
//打包参数
tars::TarsOutputStream<tars::BufferWriter> _os;
_os.write(taskReq, 1);
std::map<string, string> _mStatus;
//调用tars_invoke_async.异步调用 接口函数,将参数传入
tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback);
}
//promise调用.
promise::Future< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise_async_addTaskReq(const tars::TaskReq &taskReq,const map<string, string>& context)
{
promise::Promise< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise;
AdminRegPrxCallbackPromisePtr callback = new AdminRegPrxCallbackPromise(promise);
tars::TarsOutputStream<tars::BufferWriter> _os;
_os.write(taskReq, 1);
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback);
return promise.getFuture();
}
//协程调用接口.
void coro_addTaskReq(AdminRegCoroPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT())
{
tars::TarsOutputStream<tars::BufferWriter> _os;
_os.write(taskReq, 1);
std::map<string, string> _mStatus;
tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback, true);

}

看上面的实现方式。虽然看起来有4种方式,但实际应该只算是两种:

通过 tars_invoke同步调用 或 tars_invoke_async 异步调用实现。只是异步调用时候,回调处理的模式分三种类,AdminRegPrxCallback,AdminRegPrxCallbackPromise,AdminRegCoroPrxCallback

另外还包括:

tars_hash,tars_consistent_hash,tars_set_timeout这几个默认接口

class AdminRegPrxCallback: public tars::ServantProxyCallback;

class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;

class AdminRegCoroPrxCallback: public AdminRegPrxCallback;

这几个类都实现了每个函数的默认回调处理函数和异常处理函数,并且也实现了OnDispatch。

类似于上面Servant类的OnDispatch处理流程。

如果结果报错,将对应回调的异常处理函数执行下;结果正确,把返回结果反序列化出来,执行下前面生成的回调处理函数即可。3种异步处理类都类似

其中 tars的作者大佬 更推荐用协程方式处理异步调用,这样使用方便很多。所以亲,基本上你用的时候,看同步调用,普通异步调用及协程调用就可。

ServantProxy::invoke的具体实现流程

在ServantProxy类中,tars_invoke与tars_invoke_async其实都只是个中间函数,其实都是将参数打包到ReqMessage msg,然后传给invoke.重点流程都是在invoke实现

在invoke流程中:

1、先读取ServantProxyThreadData线程共享数据pSptd.并将对应 染色,哈希方式,超时配置 设置给msg参数. ServantProxyThreadData类的实现细节,参照

2、根据pSptd,调用selectNetThreadInfo选取一个ObjectProxy和ReqInfoQueue。具体实现参考ServantProxy::selectNetThreadInfo部分章节.

3、如果当前objProxy是set调用模式,更改msg中参数成objProxy中对应set

4、若是同步调用。。若是pSptd->_sched协程有值,则设置msg中协程为pSptd->_sched;若非协程,创建一个ReqMonitor并赋值给msg中pMonitor(这个好像就是一个lock)。

5、若是异步调用并且也是协程调用:错误检测;设置msg中协程为pSptd->_sched

6、往ReqInfoQueue队尾插入msg并调用pObjProxy->getCommunicatorEpoll()->notify(序号,ReqInfoQueue)通知epoll去消费;如果队列满了插入msg失败,打log,还是会通知epoll去消费队此队列,并抛出异常

7、如果是异步调用,流程已完成,返回;

如果是同步调用,阻塞在此等结果返回。如果是协程模式,yield阻塞在此;如果是普通阻塞,pMonitor->wait()在此等待直到网络线程通知过来。

退出阻塞状态时,检查退出原因。正常退出返回;超时抛出超时异常,异常抛出对应异常。

而CommunicatorEpoll->notify()中的代码也简单。如果CommunicatorEpoll中的对应序号的请求通知NotifyInfo是有效的,则将传入的ReqInfoQueue用epoll的EPOLL_CTL_ADD传入EPOLLIN事件插入epoll那边队列;如果NotifyInfo是无效的,则直接调用EPOLL_CTL_MOD传入一个EPOLLIN事件。

至此,关键流程就走到CommunicatorEpoll那边的流程了。

CommunicatorEpoll中对应的调用流程:

CommunicatorEpoll类的成员描述和参数细节,可看《Communicator通信器相关部分》中CommunicatorEpoll部分的内容。此处只介绍上面消息包到此处后的处理流程。

CommunicatorEpoll::run().这是线程循环总流程.伪代码如下:

while(){
try{
//epoll.wait()
int num = _ep.wait(iTimeout);
//先处理epoll的网络事件
for (int i = 0; i < num; ++i){
const epoll_event& ev = _ep.get(i);
handle((FDInfo*)ev.data.u64, ev.events);
}
//处理超时请求
doTimeout();
//数据上报
doStat();
}
catch (...){
...
}
}

无论是ServantProxy::invoke()中投递到epoll中要发送的内容,还是收到服务器那边给的返回,都会在此epoll队列中等待处理.很明显,handle()函数很重要。

CommunicatorEpoll::handle()的伪代码如下:

try{
//队列有消息通知过来。这类消息就是对ServantProxy::invoke()中投递过来消息的处理环节
if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)
{
ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;
ReqMessage * msg = NULL;
try
{
while(pInfoQueue->pop_front(msg))
{
//线程退出
if(ReqMessage::THREAD_EXIT == msg->eType)
{
_ep.del(_notify[pFDInfo->iSeq].notify.getfd(),(long long)&_notify[pFDInfo->iSeq].stFDInfo, EPOLLIN);
delete pInfoQueue;
清 _notify[pFDInfo->iSeq]相关操作..完全看不懂 这么设置有啥好的
....
return;
}
try
{
//真正处理
msg->pObjectProxy->invoke(msg);
}
catch(...)
{
...
}
}
}
catch{
.....
}
}
else {
Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;
//先收包
if (events & EPOLLIN)
{
try
{
handleInputImp(pTransceiver);
}
catch....
}
//发包
if (events & EPOLLOUT)
{
try
{
handleOutputImp(pTransceiver);
}
catch....
}
//连接出错 直接关闭连接
if(events & EPOLLERR)
{
try
{
pTransceiver->close();
}
catch...
}
}
}
catch{
....
}

tars代码中,这些重要环节全部用try-catch 挨个环节包起来,这样哪步报错,出啥错,都一清二楚.

上面流程中,收包发包后续再分析,先看下pObjectProxy->invoke(msg)流程。

ObjectProxy::invoke流程的实现

伪代码如下:

//选择一个远程服务的Adapter来调用
AdapterProxy * pAdapterProxy = NULL;
//根据请求策略从可用的服务列表选择一个服务节点
bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);
//这段是啥意思呢?其实是初始化的时候是无效的数据
//只有请求过主控或者从文件缓存加载的数据才是有效数据,这里判断是否请求过主控
if(bFirst)
{
//未请求过主控,无效数据.
//则把数据缓存在obj _reqTimeoutQueue里面
bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime);
assert(bRet);
return;
}
//未选到AdapterProxy
if(!pAdapterProxy)
{
msg->response.iRet = TARSADAPTERNULL;
doInvokeException(msg);
return ;
}
msg->adapter = pAdapterProxy;
pAdapterProxy->invoke(msg);

selectAdapterProxy很重要,是关键流程。一般我们rpc调用,被调服务端可能会有很多台机器,具体选择哪一台(通过哈希?还是轮询?还是set? 都是通过selectAdapterProxy来实现),这块内容,看《selectAdapterProxy实现流程》章节详细介绍。

另外这里还有个函数叫,ObjectProxy::doInvoke,跟ObjectProxy::invoke大体一致。其实这个函数就是上面selectAdapterProxy返回为true时。这个东东其实是,第一次请求(或者此servant信息被清除掉时){完全看不懂系列,此处貌似有问题,重看代码,感觉是每次ObjectProxy::invoke都会跑到此处并refreshReg(),这个就有点奇怪了。}这个servant时,并没有建立好与此Servant后面提供功能的服务器相关的内容,那么就要先去registry请求调用servant相关信息,才能找到servant提供功能的服务器上。那么去registry请求数据,这个过程是阻塞式的,不可能一直在此等着。所以_reqTimeoutQueue.push先把消息扔队列里,等请求结果返回后QueryEpBase::doEndpoints(),再回ObjectProxy::doInvoke,这里执行一下该有的发送的流程(跟ObjectProxy::invoke处理一致)。

如果未选到AdapterProxy,先给msg->response.iRet = TARSADAPTERNULL 表示"客户端选路为空,服务不存在或者所有服务down掉了";再走到ObjectProxy::doInvokeException().

这个函数的实现,简单来说就是同步的发通知给msg->pMonitor之前的wait解除;异步的调用之前msg上设置的回调函数onDispatch;协程的调用msg->sched->put(msg->iCoroId)。反正就是对应通知的结果已搞完的意思。

如果选择好正确的pAdapterProxy之后,调用pAdapterProxy->invoke(msg)。

这个函数的流程:

1、自身未发链表长度检查,长度跟communicatorEpoll中的nosendqueuelimit一样,默认是1000。过了就丢掉本次请求,并执行finishInvoke()跟上面doInvokeException差不多流程的代码通知本次调用结果。

2、调用_objectProxy->getProxyProtocol().requestFunc。此函数是ObjectProxy章节介绍中的协议解析器设置的 请求发包前 协议的一些处理函数,默认requestFunc=ProxyProtocol::tarsRequest()。如果你有定制协议,此处就会走到你定制协议的压请求包流程。

3、如果超时队列中有数据未发送完,则走下面失败流程;队列空,立即执行Transceiver::sendRequest()发送(这个东东就是tcp执行tcp的,udp执行udp的了send),如果发送失败,则走下面失败流程,成功,删掉msg指针,返回成功。注意这里真正执行发送时候,数据过长会缓存到发送TcpTransceiver的_sendBuffer中。在下次发送之前,或者是下次epoll的EPOLLOUT写请求时候,会再次发送,清空此发送队列。这个不是重点,细节就不讲了

4、第3步如果失败,走到这里,把这个消息塞入超时队列。如果插入队列失败,那估计是过载了,调用finishInvoke()。返回失败码

5、到这里 还剩下2个问题没弄清楚:

1]超时队列中数据 后面会怎么处理.

超时队列中有两种数据,一种是 未发送成功的数据,其包含在未发送数据列表中;一种是 已发送数据。

对于未发送数据。这里有个很坑的地方,我看了很久才看懂。在发送成功时候往超时列表里塞数据时,这个超时列表的未发送数据队列是不会把这个数据加进未发送标记的,只有发送失败的才会加入未发送标记队列,然后被AdapterProxy::doInvoke中会循环去读取超时列表中的未发送数据,调用Transceiver发送。(第一遍看代码很容易认为此次写的有问题,发送成功的也往这个队列里塞再发一次,其实是不会的)。

对于已发送数据。因为Transceiver::connect的时候,会把操作的fd设置到CommunicatorEpoll对应的epoll中。所以如果服务端请求结果回来了,首先原有套接字会收到一个EPOLLIN,再调用到CommunicatorEpoll::handleInputImp,这里最终会调用到Transceiver::doResponse。

这里会执行readv(tcp)或者recv(udp)收数据;再调用前面设的ProxyProtocol.responseFunc解包函数将数据内容读取到一个list<ResponsePacket>中;再调用finishInvoke(ResponsePacket)轮询处理每个ResponsePacket,找到对应的ReqMessage,将请求结果状态,及结果设定好。后面流程跟前面讲过的finishInvoke(msg)一致了。

另外,对这个超时队列的处理,还有个超时流程,在AdapterProxy::doTimeout()中会把超时没处理的数据,不管是否发送成功过的,都移除掉,并置异常标志,再调用finishInvoke()。

2]epoll中的发包事件EPOLLOUT 从哪而来.

EPOLLOUT 发包事件,在链接创建成功时会设置epoll监听此事件,除此之外。没看到啥rpc调用的逻辑会跑到这里来。。有点怪呀。完全没看懂系列。

至此,客户端请求发送rpc的流程,基本上走通了。

原文地址:https://www.cnblogs.com/yylingyao/p/12198131.html

时间: 2024-11-07 04:55:36

tars framework 源码解读(三) servant部分章节。客户端部分。完整的tars调用流程详解的相关文章

tars framework 源码解读(四) servant部分章节。客户端部分。selectAdapterProxy实现流程 选取合适的AdapterProxy

这块内容是个关键性流程. tars调用在调用层,一般只会简单的 PatchPrx proxy = Application::getCommunicator()->stringToProxy<PatchPrx>(_patchRequest.patchobj); proxy->timeout(60000); proxy只是一个ServantProxy,他只是作为一个本地与服务器组中某个服务器名的几个服务进程打交道的代表,这些具体干活的服务进程,会分别部署在几个不同的服务器物理机中,一般

tars framework 源码解读(五) servant部分章节。客户端部分。客户端部分的一些类及其配置细节

ServantProxy类 很有意思的是,服务端的实现叫Servant,对应客户端连接器的名就叫ServantProxy,顾名思义,是客户端连接的代理. selectNetThreadInfo的实现: 1获取ServantProxyThreadData pSptd; 2如果pSptd还没初始化过,先new本Servant ObjectProxyNum个ReqInfoQueue队列并赋值给pSptd->_reqQueue,这个ObjectProxyNum正好也是Servant的ObjectProx

tars framework 源码解读(一) 总述

tars framework源码 其实分两块. 一块是tarscpp 这块的代码跟外面的tarscpp貌似是一毛一样的.这块代码实现了一个服务的基础lib 一块是framework基础服务代码实现.这些服务撑起来整个tars系统的框架功能.直接用Introduction.zh.md中的图来说明一下吧. 整体架构的拓扑图主要分为2个部分:服务节点与公共框架节点. 服务节点: 服务节点可以认为是服务所实际运行的一个具体的操作系统实例,可以是物理主机或者虚拟主机.云主机.随着服务的种类扩展和规模扩大,

tars framework 源码解读(二) libservant部分源码的简介

还是直接用官方原图解说 服务端:可以理解成对外公开的接口 被调用时候响应流程 的底层封装 (响应端) NetThread: 收发包,连接管理,多线程(可配置),采用epoll ET触发实现,支持tcp/udp: BindAdapter: 绑定端口类,用于管理Servant对应的绑定端口的信息操作: ServantHandle:业务线程类,根据对象名分派Servant的对象和接口调用: AdminServant: 管理端口的对象: ServantImp: 继承Servant的业务处理基类(Serv

Django Rest Framework源码剖析(三)-----频率控制

一.简介 承接上篇文章Django Rest Framework源码剖析(二)-----权限,当服务的接口被频繁调用,导致资源紧张怎么办呢?当然或许有很多解决办法,比如:负载均衡.提高服务器配置.通过代理限制访问频率等,但是django rest framework自身就提供了访问频率的控制,可以从代码本身做控制. 二.频率控制内部原理概述 django rest framework 中频率控制基本原理基于访问次数和时间,通过计算实现,当然我们也可以自己定义频率控制方法.基本原理如下: 启用频率

.NET源码保护控件VMProtect免费下载及使用教程脱壳等功能详解

原文来自VMProtect龙博方案网www.fanganwang.com VMProtect是一款全新的软件保护工具.与其它大部分的保护程序不同,VMProtect可修改程序的源代码.VMProtect可将被保护文件中的部分代码转化到在虚拟机(以下称作VM)上运行的程序(以下称作bytecode)中.您同样可把VM想象为具备命令系统的虚拟处理器,该命令系统与Intel 8086处理器所使用的完全不同.例如,VM没有负责比较2个操作数的命令,也没有有条件与无条件的移转等.就象您现在看到的,黑客必须

jQuery源码解读三选择器

直接上jQuery源码截取代码 // Map over jQuery in case of overwrite _jQuery = window.jQuery, // Map over the $ in case of overwrite _$ = window.$, // Define a local copy of jQuery 可以看出创建了jQuery.fn.init这样的一个函数返回给$,这样就可以使用$实例了 jQuery = function (selector, context)

mybatis源码解读(三)——数据源的配置

在mybatis-configuration.xml 文件中,我们进行了如下的配置: <!-- 可以配置多个运行环境,但是每个 SqlSessionFactory 实例只能选择一个运行环境常用: 一.development:开发模式 二.work:工作模式 --> <environments default="development"> <!--id属性必须和上面的default一样 --> <environment id="deve

Bert源码解读(三)之绘制流程图

一.Bert所用Transformer的架构图 二.BertModel流程图 原文地址:https://www.cnblogs.com/gczr/p/12393925.html