UDT中的epoll

epoll 是为处理大量句柄而改进的poll,在UDT中也有支持。UDT使用了内核提供的epoll,主要是epoll_create,epoll_wait,epoll_ctl,UDT定义了CEPollDesc这个结构来管理epoll的描述符和套接字。

struct CEPollDesc

{

int m_iID;                                // epoll ID

std::set<UDTSOCKET> m_sUDTSocksOut;       // set of UDT sockets waiting for write events

std::set<UDTSOCKET> m_sUDTSocksIn;        // set of UDT sockets waiting for read events

std::set<UDTSOCKET> m_sUDTSocksEx;        // set of UDT sockets waiting for exceptions

int m_iLocalID;                           // local system epoll ID

std::set<SYSSOCKET> m_sLocals;            // set of local (non-UDT) descriptors

std::set<UDTSOCKET> m_sUDTWrites;         // UDT sockets ready for write

std::set<UDTSOCKET> m_sUDTReads;          // UDT sockets ready for read

std::set<UDTSOCKET> m_sUDTExcepts;        // UDT sockets with exceptions (connection broken, etc.)

};

特别要提醒的是,当对端socket连接中断的时候,也是在m_sUDTReads里的

UDT还实现了一个类来进行各项操作,实现的有

create():创建一个epoll,调用了epoll_create

add_usock():添加一个UDT套接字到epoll

add_ssock():添加一个系统套接字到epoll,调用了epoll_ctl

remove_usock():从epoll中移除一个UDT套接字

remove_ssock():从epoll中移除一个系统套接字,调用了epoll_ctl

wait():等待epoll事件或者超时,调用了epoll_wait

release():关闭并释放一个epoll

UDT里对epoll的调用是四段式的,比如add_usock这里,调用顺序是epoll_add_usock()->CUDT::epoll_add_usock()->s_UDTUnited.epoll_add_usock()->CEPoll::add_usock

int epoll_add_usock(int eid, UDTSOCKET u, const int* events)

{

return CUDT::epoll_add_usock(eid, u, events);

}

int CUDT::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)

{

try

{

return s_UDTUnited.epoll_add_usock(eid, u, events);

}

catch (CUDTException e)

{

s_UDTUnited.setError(new CUDTException(e));

return ERROR;

}

catch (...)

{

s_UDTUnited.setError(new CUDTException(-1, 0, 0));

return ERROR;

}

}

int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)

{

CUDTSocket* s = locate(u);

int ret = -1;

if (NULL != s)

{

ret = m_EPoll.add_usock(eid, u, events);

s->m_pUDT->addEPoll(eid);

}

else

{

throw CUDTException(5, 4);

}

return ret;

}

int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)

{

CGuard pg(m_EPollLock);

map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);

if (p == m_mPolls.end())

throw CUDTException(5, 13);

if (!events || (*events & UDT_EPOLL_IN))       //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算

p->second.m_sUDTSocksIn.insert(u);

if (!events || (*events & UDT_EPOLL_OUT))

p->second.m_sUDTSocksOut.insert(u);

return 0;

}

UDT命名空间提供给应用程序调用接口,可成为API层,API层调用CUDT API,这一层主要做错误处理,捕捉下面两层抛出的错误保存起来交给应用程序使用,CUDT API调用CUDTUnited的实现,如果是UDT SOCKET的socket(),bind(),listen()等,到这层也就结束了,不过epoll要多个第四层,再调用CEPoll的实现。现在来看看CUDTUnited和CEPoll的实现。

CUDTSocket* s = locate(u);

调用CUDTUnited::locate(),根据SocketID,也就是UDT Socket handle在CUDTUnited的std::map<UDTSOCKET, CUDTSocket*> m_Sockets中找到对应的CUDTSocket结构,如果找不到,抛出错误,找到了就调用CEPoll的add_usock实现,根据传的event参数,将socket导入相应的队列。之后调用s->m_pUDT->addEPoll(eid)

void CUDT::addEPoll(const int eid)

{

CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);                      //这种通过类来实现加锁解锁的,我第一次见,还挺方便。

m_sPollID.insert(eid);

CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);

if (!m_bConnected || m_bBroken || m_bClosing)

return;

if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||

((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))

{

s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);

}

if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())

{

s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

}

}

这里已经开始更新m_sUDTWrites,m_sUDTReads,通过update_events(),如前所述,update_events()也是CEPoll的成员函数。

int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)

{

CGuard pg(m_EPollLock);

map<int, CEPollDesc>::iterator p;

vector<int> lost;

for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)

{

p = m_mPolls.find(*i);

if (p == m_mPolls.end())

{

lost.push_back(*i);

}

else

{

if ((events & UDT_EPOLL_IN) != 0)

update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);   //UDT_EPOLL_IN 和UDT_EPOLL_OUT、UDT_EPOLL_ERROR分别是0x1, 0x4, 0x8,可以进行&运算

if ((events & UDT_EPOLL_OUT) != 0)

update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);

if ((events & UDT_EPOLL_ERR) != 0)

update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);

}

}

for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)

eids.erase(*i);

return 0;

}

void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)

{

if (enable && (watch.find(uid) != watch.end()))

{

result.insert(uid);

}

else if (!enable)

{

result.erase(uid);

}

}

最后说下wait函数的实现,一样是四段式,就跳过前面三段,直接看第四段

int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)

{

// if all fields is NULL and waiting time is infinite, then this would be a deadlock   都空的的话,会死锁,抛出异常

if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))

throw CUDTException(5, 3, 0);

// Clear these sets in case the app forget to do it.  清空

if (readfds) readfds->clear();

if (writefds) writefds->clear();

if (lrfds) lrfds->clear();

if (lwfds) lwfds->clear();

int total = 0;

int64_t entertime = CTimer::getTime();

while (true)

{

CGuard::enterCS(m_EPollLock);

map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);

if (p == m_mPolls.end())

{

CGuard::leaveCS(m_EPollLock);

throw CUDTException(5, 13);

}

if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))

{

// no socket is being monitored, this may be a deadlock  都空的的话,会死锁,抛出异常

CGuard::leaveCS(m_EPollLock);

throw CUDTException(5, 3);

}

// Sockets with exceptions are returned to both read and write sets.          把m_sUDTReads和m_sUDTExcepts都读到readfds里

if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))

{

*readfds = p->second.m_sUDTReads;

for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)

readfds->insert(*i);

total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();

}

if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))          //把m_sUDTWrites和m_sUDTExcepts都读到writefds里

{

*writefds = p->second.m_sUDTWrites;

for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)

writefds->insert(*i);

total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();

}

if (lrfds || lwfds)     //读系统套接字

{

#ifdef LINUX

const int max_events = p->second.m_sLocals.size();

epoll_event ev[max_events];

int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);

for (int i = 0; i < nfds; ++ i)

{

if ((NULL != lrfds) && (ev[i].events & EPOLLIN))

{

lrfds->insert(ev[i].data.fd);

++ total;

}

if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))

{

lwfds->insert(ev[i].data.fd);

++ total;

}

}

#else

//currently "select" is used for all non-Linux platforms.

//faster approaches can be applied for specific systems in the future.

//"select" has a limitation on the number of sockets

fd_set readfds;

fd_set writefds;

FD_ZERO(&readfds);

FD_ZERO(&writefds);

for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)

{

if (lrfds)

FD_SET(*i, &readfds);

if (lwfds)

FD_SET(*i, &writefds);

}

timeval tv;

tv.tv_sec = 0;

tv.tv_usec = 0;

if (::select(0, &readfds, &writefds, NULL, &tv) > 0)

{

for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)

{

if (lrfds && FD_ISSET(*i, &readfds))

{

lrfds->insert(*i);

++ total;

}

if (lwfds && FD_ISSET(*i, &writefds))

{

lwfds->insert(*i);

++ total;

}

}

}

#endif

}

CGuard::leaveCS(m_EPollLock);

if (total > 0)

return total;

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))

throw CUDTException(6, 3, 0);

CTimer::waitForEvent();

}

return 0;

}

时间: 2024-12-14 18:43:18

UDT中的epoll的相关文章

UDT中epoll对CLOSE状态的处理

epoll_wait()返回可用uid时,对uid取状态,本该是BROKEN的,却取到CLOSED,然而,不能像处理BROKEN事件那样处理CLOSED事件,这样移除不了CLOSED事件,于是epoll_wait不断返回该uid,就造成了死循环.跟踪代码至底层,寻找原因. int CUDTUnited::epoll_remove_usock(const int eid, const UDTSOCKET u) { int ret = m_EPoll.remove_usock(eid, u); CU

解决UDT中内存下不去的问题

使用UDT库,编写简单的网络通信程序,发现了一个问题,关闭一部分连接后,程序占用内存并没有变化. 比如先连接500个,再连接另500个,先关掉后面500个,程序占用内存降一半,再关掉500个,程序占用内存降到0.1.然而,如果先关掉前面500个,程序占用内存不会发生变化,只有等再关掉后面500个,程序内存才会降到0.1. 换个顺序就降不了,这很奇怪,很“玄学”. 跟踪代码至底层,该有的释放都有,这是为什么?灵机一动想到可能与linux内存管理机制有关,果不其然,linux中给程序分配堆内存后,当

五种网络IO模型以及多路复用IO中select/epoll对比

下面都是以网络读数据为例 [2阶段网络IO] 第一阶段:等待数据 wait for data 第二阶段:从内核复制数据到用户 copy data from kernel to user 下面是5种网络IO模型 [阻塞blocking IO] 两阶段全程阻塞 recvfrom -> [syscall -> wait -> copy ->] return OK [非阻塞nonblocking IO] 第一阶段是非阻塞的不断检查是否数据准备好,第二阶段阻塞读取数据 recvfrom -&

Python——在Python中如何使用Linux的epoll

在Python中如何使用Linux的epoll 目录 序言 阻塞socket编程示例 异步socket的好处以及Linux epoll 带epoll的异步socket编程示例 性能注意事项 源代码 序言 从2.6开始,Python包含了访问Linux epoll库的API.这篇文章用几个简单的python 3例子来展示下这个API.欢迎大家质疑和反馈. 阻塞socket编程示例 示例1用python3.0搭建了一个简单的服务:在8080端口监听HTTP请求,把它打印到控制台,并返回一个HTTP响

应用服务器中对JDK的epoll空转bug的处理

原文链接:应用服务器中对JDK的epoll空转bug的处理 前面讲到了epoll的一些机制,与select和poll等传统古老的IO多路复用机制的一些区别,这些区别实质可以总结为一句话, 就是epoll将重要的基于事件的fd集合放在了内核中来完成,因为内核是高效的,所以很多关于fd事件监听集合的操作也是高效的, 不方便的就是,因为在内核中,所以我们需要通过系统调用来调用关于fd操作集合,而不是直接自己攒一个. 如果在linux中,epoll在JDK6中还需要配置,在后续的版本中为JDK的NIO提

UDT协议实现分析——UDT Socket的创建

UDT API的用法 在分析 连接的建立过程 之前,先来看一下UDT API的用法.在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接:有一个或多个客户端连接UDT Server:UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信. 先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下. #include <unistd.h>

UDT协议实现分析——连接的建立

UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了.这里我们研究一下UDT连接建立的过程. 连接的发起 来看连接的发起方.如前面我们看到的那样,UDT Client创建一个Socket,可以将该Socket绑定到某个端口,也可以不绑定,然后就可以调用UDT::connect()将这个Socket连接到UDT Server了.来看UDT::connect()的定义(src/api.cpp): int CUDTUnited::connect(const UDTS

socket阻塞与非阻塞,同步与异步、I/O模型,select与poll、epoll比较

1. 概念理解 在进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式: 同步/异步主要针对C端: 同步:      所谓同步,就是在c端发出一个功能调用时,在没有得到结果之前,该调用就不返回.也就是必须一件一件事做,等前一件做完了才能做下一件事. 例如普通B/S模式(同步):提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能干任何事 异步:      异步的概念和同步相对.当c端一个异步过程调用发出后,调

基本I/O模型与Epoll简介

5种基本的I/O模型:1)阻塞I/O ;2)非阻塞I/O; 3)I/O复用(select和poll);4)信号驱动I/O(SIGIO);5)异步I/O(POSIX.1的aio_系列函数). 操作系统中一个输入操作一般有两个不同的阶段: 第一:等待数据准备好.第二:从内核到进程拷贝数据.对于一个sockt上的输入操作,第一步一般是等待数据到达网络,当分组到达时,它被拷贝到内核中的某个缓冲区,第二步是将数据从内核缓冲区拷贝到应用程序缓冲区. 一.           阻塞I/O模型 请求无法立即完成