一般tars客户端使用方式:
我们用客户端进行tars rpc调用时候,一般如下面这样写:
方式一、 //直连方式 TC_Endpoint ep; AdminFPrx pAdminPrx; //服务管理代理 string sAdminPrx = "[email protected]"+_serverObjectPtr->getLocalEndpoint().toString(); pAdminPrx = Application::getCommunicator()->stringToProxy<AdminFPrx>(sAdminPrx); sResult = pAdminPrx->notify(_msg); 方式二、 PatchPrx proxy = Application::getCommunicator()->stringToProxy<PatchPrx>(_patchRequest.patchobj); proxy->tars_timeout(60000); 方式三、 NotifyPrx pNotifyPrx = Application::getCommunicator()->stringToProxy<NotifyPrx>(ServerConfig::Notify); if (pNotifyPrx && sResult != "") { pNotifyPrx->async_reportServer(NULL, sServerId, "", sResult); }
这里只是列举了几种,还有更多的写法。其实都是分成两步:
1由Application::getCommunicator()->stringToProxy(strObjName)得到一个proxyPtr。
2再用proxyPtr,同步,或者异步等各种rpc调用对应的方法.
其实上面的调用,也等同于:
"[email protected] -h ip1 -t 60000 -p port1;tcp -h ip2 -t 60000 -p port1"
可以根据servantname去直连对应的机器
那么核心流程就是stringToProxy和proxyPtr里面的实现。
获取Proxy的实现
stringToProxy的伪代码实现是: stringToProxy(const string& objectName,const string& setName="") { return ServantProxyFactory->getServantProxy(objectName,setName); } getServantProxy的实现伪代码是: getServantProxy(const string& name,const string& setName) { if _servantProxy.find(name+":"+setName) return findvalue; ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()]; for(size_t i = 0; i < _comm->getClientThreadNum(); ++i) { ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName); } ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum()); //设置同步调用超时3ss。异步调用超时5s。连接超时1.5s sp->tars_timeout(_comm->getProperty("sync-invoke-timeout", "3000")); sp->tars_async_timeout(_comm->getProperty("async-invoke-timeout", "5000")); sp->tars_connect_timeout(_comm->getProperty("connect-timeout", "1500")); _servantProxy[name+":"+setName]=sp; return sp; }
上面代码说白了就是建立一个ServantProxy.并且他上面会带getClientThreadNum个数的ObjectProxy.一个ObjectProxy对应一个线程。然后缓存起来。
那么就有个问题getClientThreadNum和sync-invoke-timeout这些东东是配在哪的呢?
其实是在#define CONFIG_ROOT_PATH "/tars/application/client" 下面的对应内容.
getClientThreadNum值是netthread字段。默认是1。也就是基本上大部分tars服务一般就一个客户端线程.
ServantProxy部分内容查看 《ServantProxy部分模块》
rpc调用的实现
每个.tars文件最终都会被tars parser编译生成一个.h及.hpp文件,单就interface部分而已,会生成带对应interfacename字符的一些类:
客户端用:
PrxCallback类(普通异步回调处理类),
PrxCallbackPromise类(promise异步回调处理类),
CoroPrxCallback类(协程异步回调处理类),
客户端Proxy实现类。上面3个类的处理流程都会在这个类的异步调用中用到
服务端用:
Servant类。包括 下面几个部分:
目标接口的纯虚函数。
以及目标接口的被调用回复结果async_response函数(里面自动生成代码 实现了对响应结果进行统计)。
onDispatch函数,此函数自动生成代码,实现了对发送过来的请求,根据请求TarsCurrentPtr中的接口名找到对应的函数(用的是一个vector来 switch case 函数在接口组中的序号,加快匹配速度);从TarsCurrentPtr中解包出对应结构体;执行刚才找到的函数,并将执行结果返回给结果参数
这几块类的具体实现的内容,参考下面AdminReg部分的介绍
比如AdminReg.tars中的interface AdminReg生成有:
//这3个是异步回调处理类
class AdminRegPrxCallback: public tars::ServantProxyCallback;
class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;
class AdminRegCoroPrxCallback: public AdminRegPrxCallback;
//下面是主要功能实现类
class AdminRegProxy : public tars::ServantProxy; 客户端类
class AdminReg : public tars::Servant; 服务端类
这一大坨代码很长很长。都是.tars文件自动生成的一套通用模板类。里面的具体实现介绍如下:
class AdminRegProxy : public tars::ServantProxy;类
给每个接口都实现了如下模式代码,详解看下面注释:
typedef map<string, string> TARS_CONTEXT; //同步调用接口. tars::Int32 addTaskReq(const tars::TaskReq & taskReq,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL) { //将请求参数打包到TarsOutputStream<tars::BufferWriter>,其实是序列化 tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); tars::ResponsePacket rep; std::map<string, string> _mStatus; //调用tars_invoke。将对应函数名和请求,返回等参数传入 tars_invoke(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, rep); //返回请求context结果..具体作用后面再分析 if(pResponseContext) { *pResponseContext = rep.context; } //将返回结果从TarsOutputStream<tars::BufferWriter>反序列化出来 tars::TarsInputStream<tars::BufferReader> _is; _is.setBuffer(rep.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true); //返回调用返回值 return _ret; } //异步调用接口. void async_addTaskReq(AdminRegPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT()) { //打包参数 tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; //调用tars_invoke_async.异步调用 接口函数,将参数传入 tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback); } //promise调用. promise::Future< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise_async_addTaskReq(const tars::TaskReq &taskReq,const map<string, string>& context) { promise::Promise< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise; AdminRegPrxCallbackPromisePtr callback = new AdminRegPrxCallbackPromise(promise); tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback); return promise.getFuture(); } //协程调用接口. void coro_addTaskReq(AdminRegCoroPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT()) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback, true); }
看上面的实现方式。虽然看起来有4种方式,但实际应该只算是两种:
通过 tars_invoke同步调用 或 tars_invoke_async 异步调用实现。只是异步调用时候,回调处理的模式分三种类,AdminRegPrxCallback,AdminRegPrxCallbackPromise,AdminRegCoroPrxCallback
另外还包括:
tars_hash,tars_consistent_hash,tars_set_timeout这几个默认接口
class AdminRegPrxCallback: public tars::ServantProxyCallback;
class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;
class AdminRegCoroPrxCallback: public AdminRegPrxCallback;
这几个类都实现了每个函数的默认回调处理函数和异常处理函数,并且也实现了OnDispatch。
类似于上面Servant类的OnDispatch处理流程。
如果结果报错,将对应回调的异常处理函数执行下;结果正确,把返回结果反序列化出来,执行下前面生成的回调处理函数即可。3种异步处理类都类似
其中 tars的作者大佬 更推荐用协程方式处理异步调用,这样使用方便很多。所以亲,基本上你用的时候,看同步调用,普通异步调用及协程调用就可。
ServantProxy::invoke的具体实现流程
在ServantProxy类中,tars_invoke与tars_invoke_async其实都只是个中间函数,其实都是将参数打包到ReqMessage msg,然后传给invoke.重点流程都是在invoke实现
在invoke流程中:
1、先读取ServantProxyThreadData线程共享数据pSptd.并将对应 染色,哈希方式,超时配置 设置给msg参数. ServantProxyThreadData类的实现细节,参照
2、根据pSptd,调用selectNetThreadInfo选取一个ObjectProxy和ReqInfoQueue。具体实现参考ServantProxy::selectNetThreadInfo部分章节.
3、如果当前objProxy是set调用模式,更改msg中参数成objProxy中对应set
4、若是同步调用。。若是pSptd->_sched协程有值,则设置msg中协程为pSptd->_sched;若非协程,创建一个ReqMonitor并赋值给msg中pMonitor(这个好像就是一个lock)。
5、若是异步调用并且也是协程调用:错误检测;设置msg中协程为pSptd->_sched
6、往ReqInfoQueue队尾插入msg并调用pObjProxy->getCommunicatorEpoll()->notify(序号,ReqInfoQueue)通知epoll去消费;如果队列满了插入msg失败,打log,还是会通知epoll去消费队此队列,并抛出异常
7、如果是异步调用,流程已完成,返回;
如果是同步调用,阻塞在此等结果返回。如果是协程模式,yield阻塞在此;如果是普通阻塞,pMonitor->wait()在此等待直到网络线程通知过来。
退出阻塞状态时,检查退出原因。正常退出返回;超时抛出超时异常,异常抛出对应异常。
而CommunicatorEpoll->notify()中的代码也简单。如果CommunicatorEpoll中的对应序号的请求通知NotifyInfo是有效的,则将传入的ReqInfoQueue用epoll的EPOLL_CTL_ADD传入EPOLLIN事件插入epoll那边队列;如果NotifyInfo是无效的,则直接调用EPOLL_CTL_MOD传入一个EPOLLIN事件。
至此,关键流程就走到CommunicatorEpoll那边的流程了。
CommunicatorEpoll中对应的调用流程:
CommunicatorEpoll类的成员描述和参数细节,可看《Communicator通信器相关部分》中CommunicatorEpoll部分的内容。此处只介绍上面消息包到此处后的处理流程。
CommunicatorEpoll::run().这是线程循环总流程.伪代码如下:
while(){ try{ //epoll.wait() int num = _ep.wait(iTimeout); //先处理epoll的网络事件 for (int i = 0; i < num; ++i){ const epoll_event& ev = _ep.get(i); handle((FDInfo*)ev.data.u64, ev.events); } //处理超时请求 doTimeout(); //数据上报 doStat(); } catch (...){ ... } }
无论是ServantProxy::invoke()中投递到epoll中要发送的内容,还是收到服务器那边给的返回,都会在此epoll队列中等待处理.很明显,handle()函数很重要。
CommunicatorEpoll::handle()的伪代码如下:
try{ //队列有消息通知过来。这类消息就是对ServantProxy::invoke()中投递过来消息的处理环节 if(FDInfo::ET_C_NOTIFY == pFDInfo->iType) { ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p; ReqMessage * msg = NULL; try { while(pInfoQueue->pop_front(msg)) { //线程退出 if(ReqMessage::THREAD_EXIT == msg->eType) { _ep.del(_notify[pFDInfo->iSeq].notify.getfd(),(long long)&_notify[pFDInfo->iSeq].stFDInfo, EPOLLIN); delete pInfoQueue; 清 _notify[pFDInfo->iSeq]相关操作..完全看不懂 这么设置有啥好的 .... return; } try { //真正处理 msg->pObjectProxy->invoke(msg); } catch(...) { ... } } } catch{ ..... } } else { Transceiver *pTransceiver = (Transceiver*)pFDInfo->p; //先收包 if (events & EPOLLIN) { try { handleInputImp(pTransceiver); } catch.... } //发包 if (events & EPOLLOUT) { try { handleOutputImp(pTransceiver); } catch.... } //连接出错 直接关闭连接 if(events & EPOLLERR) { try { pTransceiver->close(); } catch... } } } catch{ .... }
tars代码中,这些重要环节全部用try-catch 挨个环节包起来,这样哪步报错,出啥错,都一清二楚.
上面流程中,收包发包后续再分析,先看下pObjectProxy->invoke(msg)流程。
ObjectProxy::invoke流程的实现
伪代码如下:
//选择一个远程服务的Adapter来调用 AdapterProxy * pAdapterProxy = NULL; //根据请求策略从可用的服务列表选择一个服务节点 bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy); //这段是啥意思呢?其实是初始化的时候是无效的数据 //只有请求过主控或者从文件缓存加载的数据才是有效数据,这里判断是否请求过主控 if(bFirst) { //未请求过主控,无效数据. //则把数据缓存在obj _reqTimeoutQueue里面 bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime); assert(bRet); return; } //未选到AdapterProxy if(!pAdapterProxy) { msg->response.iRet = TARSADAPTERNULL; doInvokeException(msg); return ; } msg->adapter = pAdapterProxy; pAdapterProxy->invoke(msg);
selectAdapterProxy很重要,是关键流程。一般我们rpc调用,被调服务端可能会有很多台机器,具体选择哪一台(通过哈希?还是轮询?还是set? 都是通过selectAdapterProxy来实现),这块内容,看《selectAdapterProxy实现流程》章节详细介绍。
另外这里还有个函数叫,ObjectProxy::doInvoke,跟ObjectProxy::invoke大体一致。其实这个函数就是上面selectAdapterProxy返回为true时。这个东东其实是,第一次请求(或者此servant信息被清除掉时){完全看不懂系列,此处貌似有问题,重看代码,感觉是每次ObjectProxy::invoke都会跑到此处并refreshReg(),这个就有点奇怪了。}这个servant时,并没有建立好与此Servant后面提供功能的服务器相关的内容,那么就要先去registry请求调用servant相关信息,才能找到servant提供功能的服务器上。那么去registry请求数据,这个过程是阻塞式的,不可能一直在此等着。所以_reqTimeoutQueue.push先把消息扔队列里,等请求结果返回后QueryEpBase::doEndpoints(),再回ObjectProxy::doInvoke,这里执行一下该有的发送的流程(跟ObjectProxy::invoke处理一致)。
如果未选到AdapterProxy,先给msg->response.iRet = TARSADAPTERNULL 表示"客户端选路为空,服务不存在或者所有服务down掉了";再走到ObjectProxy::doInvokeException().
这个函数的实现,简单来说就是同步的发通知给msg->pMonitor之前的wait解除;异步的调用之前msg上设置的回调函数onDispatch;协程的调用msg->sched->put(msg->iCoroId)。反正就是对应通知的结果已搞完的意思。
如果选择好正确的pAdapterProxy之后,调用pAdapterProxy->invoke(msg)。
这个函数的流程:
1、自身未发链表长度检查,长度跟communicatorEpoll中的nosendqueuelimit一样,默认是1000。过了就丢掉本次请求,并执行finishInvoke()跟上面doInvokeException差不多流程的代码通知本次调用结果。
2、调用_objectProxy->getProxyProtocol().requestFunc。此函数是ObjectProxy章节介绍中的协议解析器设置的 请求发包前 协议的一些处理函数,默认requestFunc=ProxyProtocol::tarsRequest()。如果你有定制协议,此处就会走到你定制协议的压请求包流程。
3、如果超时队列中有数据未发送完,则走下面失败流程;队列空,立即执行Transceiver::sendRequest()发送(这个东东就是tcp执行tcp的,udp执行udp的了send),如果发送失败,则走下面失败流程,成功,删掉msg指针,返回成功。注意这里真正执行发送时候,数据过长会缓存到发送TcpTransceiver的_sendBuffer中。在下次发送之前,或者是下次epoll的EPOLLOUT写请求时候,会再次发送,清空此发送队列。这个不是重点,细节就不讲了
4、第3步如果失败,走到这里,把这个消息塞入超时队列。如果插入队列失败,那估计是过载了,调用finishInvoke()。返回失败码
5、到这里 还剩下2个问题没弄清楚:
1]超时队列中数据 后面会怎么处理.
超时队列中有两种数据,一种是 未发送成功的数据,其包含在未发送数据列表中;一种是 已发送数据。
对于未发送数据。这里有个很坑的地方,我看了很久才看懂。在发送成功时候往超时列表里塞数据时,这个超时列表的未发送数据队列是不会把这个数据加进未发送标记的,只有发送失败的才会加入未发送标记队列,然后被AdapterProxy::doInvoke中会循环去读取超时列表中的未发送数据,调用Transceiver发送。(第一遍看代码很容易认为此次写的有问题,发送成功的也往这个队列里塞再发一次,其实是不会的)。
对于已发送数据。因为Transceiver::connect的时候,会把操作的fd设置到CommunicatorEpoll对应的epoll中。所以如果服务端请求结果回来了,首先原有套接字会收到一个EPOLLIN,再调用到CommunicatorEpoll::handleInputImp,这里最终会调用到Transceiver::doResponse。
这里会执行readv(tcp)或者recv(udp)收数据;再调用前面设的ProxyProtocol.responseFunc解包函数将数据内容读取到一个list<ResponsePacket>中;再调用finishInvoke(ResponsePacket)轮询处理每个ResponsePacket,找到对应的ReqMessage,将请求结果状态,及结果设定好。后面流程跟前面讲过的finishInvoke(msg)一致了。
另外,对这个超时队列的处理,还有个超时流程,在AdapterProxy::doTimeout()中会把超时没处理的数据,不管是否发送成功过的,都移除掉,并置异常标志,再调用finishInvoke()。
2]epoll中的发包事件EPOLLOUT 从哪而来.
EPOLLOUT 发包事件,在链接创建成功时会设置epoll监听此事件,除此之外。没看到啥rpc调用的逻辑会跑到这里来。。有点怪呀。完全没看懂系列。
至此,客户端请求发送rpc的流程,基本上走通了。
原文地址:https://www.cnblogs.com/yylingyao/p/12198131.html