UDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用,并使网络中数据传输的速率尽可能快。
如我们前面在分析数据发送的控制中看到的,对于正常的顺序packet发送,发送控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由拥塞窗口大小m_dCongestionWindow和滑动窗口大小m_iFlowWindowSize来描述,发送窗口大小为两者中较小的那一个;二是控制两个数据包发送的时间间隔,也就是包的发送速率,这一点则主要用数据包发送时间间隔m_ullInterval来描述。所有的发送控制机制主要通过影响这几个变量来控制发送过程。
发送窗口大小对于数据packet发送过程的影响比较直接,在CUDT::packData(CPacket& packet, uint64_t& ts)中,会检查最后被ACK的数据packet的SeqNo m_iSndLastAck到最近发送的数据packet的SeqNo m_iSndCurrSeqNo的offset,若offset大于窗口大小,就不再发送数据。
发送送率的控制则略微复杂一点。在CUDT::packData(CPacket& packet, uint64_t& ts)中会计算下一个数据packet发送的理想的时间点,记录在m_ullTargetTime中用于track及调整后续数据packet的发送时间,并会将该时间值返回给调用者CSndUList::pop(),CSndUList::pop()则会在将CUDT重新插入发送列表时更新CUDT的CSNode m_pSNode的时间戳字段,并根据新的时间戳来讲CUDT放在CSndUList的CUDT堆的适当位置上。
这里就来更细致地看一下UDT中发送窗口大小及发送速率的调整。
滑动窗口大小m_iFlowWindowSize
对于m_iFlowWindowSize,搜遍UDT的整个code,可以看到,主要在这样的几个地方会去更新它:
1. 数据接收端和数据发送端在建立连接的过程中会通过HandShake消息协商确认该值。
CUDT::connect(const CPacket& response):
m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
和CUDT::connect(const sockaddr* peer, CHandShake* hs):
// exchange info for maximum flow window size m_iFlowWindowSize = hs->m_iFlightFlagSize;
CUDT::connect(const sockaddr* serv_addr)中连接握手请求m_ConnReq的m_iFlightFlagSize:
m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;
在CUDT::CUDT()中会初始化m_iFlightFlagSize和m_iRcvBufSize:
m_iFlightFlagSize = 25600; m_iSndBufSize = 8192; m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size
m_iFlightFlagSize和m_iRcvBufSize这两个选项还可以通过UDT::setsockopt()进行设置(src/core.cpp):
void CUDT::setOpt(UDTOpt optName, const void* optval, int) { if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); CGuard cg(m_ConnectionLock); CGuard sendguard(m_SendLock); CGuard recvguard(m_RecvLock); switch (optName) { case UDT_FC: if (m_bConnecting || m_bConnected) throw CUDTException(5, 2, 0); if (*(int*) optval < 1) throw CUDTException(5, 3); // Mimimum recv flight flag size is 32 packets if (*(int*) optval > 32) m_iFlightFlagSize = *(int*) optval; else m_iFlightFlagSize = 32; break; case UDT_RCVBUF: if (m_bOpened) throw CUDTException(5, 1, 0); if (*(int*) optval <= 0) throw CUDTException(5, 3, 0); // Mimimum recv buffer size is 32 packets if (*(int*) optval > (m_iMSS - 28) * 32) m_iRcvBufSize = *(int*) optval / (m_iMSS - 28); else m_iRcvBufSize = 32; // recv buffer MUST not be greater than FC size if (m_iRcvBufSize > m_iFlightFlagSize) m_iRcvBufSize = m_iFlightFlagSize; break;
2. 数据接收端发送的“light” ACK消息减小m_iFlowWindowSize。
如CUDT::processCtrl(CPacket& ctrlpkt)中这样的一段code:
// process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *) ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack); m_iSndLastAck = ack; } break; }
如我们前面在 UDT数据收发的可靠性保障 的ACK部分看到的那样,“light” ACK消息通常是由于在某一小段时间内突然到达了大量的数据packet,才会发送的,这通常表明发送端发送数据过快过多了,因而“light” ACK消息会减小数据发送端的滑动窗口大小m_iFlowWindowSize。
3. 数据接收端发送的常规ACK消息直接设置发送端的滑动窗口大小m_iFlowWindowSize。
如CUDT::processCtrl(CPacket& ctrlpkt)中这样的一段code:
if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = *((int32_t *) ctrlpkt.m_pcData + 3); m_iSndLastAck = ack; }
而数据接收端则主要根据它自己的接收缓冲区的可用大小来设置,如CUDT::sendCtrl()中的这段code:
data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (currt
更新m_iFlowWindowSize的地方基本上就是这3个。
m_iFlowWindowSize除了会在CUDT::packData()中被用来决定发送窗口的大小之外,它的初始值还决定着发送丢失列表的大小,如CUDT::connect()中:
// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
及拥塞控制器最大的拥塞窗口的大小,如CUDT::connect()中:
m_pCC->setMaxCWndSize(m_iFlowWindowSize);
UDT中与滑动窗口大小相关的内容基本上就是这些。
数据包发送时间间隔m_ullInterval
对于m_ullInterval,在UDT中,则有如下的的几个地方会去更新它:
1. 数据接收端和数据发送端在建立连接过程中的CUDT::connect(const CPacket& response)和CUDT::connect(const sockaddr* peer, CHandShake* hs)中会根据拥塞控制器的m_dPktSndPeriod的初始值计算该值:
m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
UDT默认的拥塞控制器是将m_dPktSndPeriod初始化为1.0的(src/ccc.cpp):
CCC::CCC() : m_iSYNInterval(CUDT::m_iSYNInterval), m_dPktSndPeriod(1.0), ...... void CUDTCC::init() { ...... m_dCWndSize = 16; m_dPktSndPeriod = 1; }
2. 将拥塞控制器中的计算成果同步进CUDT中时会更新m_ullInterval。可以看下CUDT::CCUpdate():
void CUDT::CCUpdate() { m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; if (m_llMaxBW <= 0) return; const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency; if (m_ullInterval < minSP) m_ullInterval = minSP; }
这里同样是根据拥塞控制器的m_dPktSndPeriod值计算该值,计算方法也与前一种相同。但m_ullInterval的实际取值会受限与最大数据传输率m_llMaxBW的值。最大数据传输率m_llMaxBW默认为无效值-1,CUDT::CUDT()中:
m_llMaxBW = -1;
但可以通过UDT::setsockopt()进行设置(src/core.cpp):
case UDT_MAXBW: m_llMaxBW = *(int64_t*) optval; break;
3. 数据接收端反馈的DelayWarning控制消息增加m_ullInterval。在CUDT::processCtrl()中可以看到对于DelayWarning消息的处理:
case 4: //100 - Delay Warning // One way packet delay is increasing, so decrease the sending rate m_ullInterval = (uint64_t) ceil(m_ullInterval * 1.125); m_iLastDecSeq = m_iSndCurrSeqNo; break;
CUDT::sendCtrl()中发送DelayWarning的过程:
case 4: //100 - Congestion Warning ctrlpkt.pack(pkttype); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); CTimer::rdtsc(m_ullLastWarningTime); break;
这个过程看上去蛮直接的。但在4.0版本的UDT中,DelayWarning/CongestiongWarning消息实际上不再被用到了。在CUDT::processCtrl()中处理ACK2的case中可以看到被注释掉的这行code:
//if increasing delay detected... // sendCtrl(4);
只能缅怀曾经在发挥作用的
DelayWarning
/CongestiongWarning消息
了。
如此看来,则数据包发送时间间隔m_ullInterval似乎总是由拥塞控制器的m_dPktSndPeriod在决定了。
拥塞控制器
UDT使用拥塞控制器来追踪网络数据传输过程中发生的事件,如接收到了ACK,发生了超时等,并根据这些事件产生的时机及其它的一些基本设置进行计算,用计算结果来控制ACK消息发送的频率,发送窗口的大小,及数据包的发送频率等。
拥塞控制器虚拟工厂CCCVirtualFactory
UDT的拥塞控制器机制中,不直接创建拥塞控制器,而是通过拥塞控制器工厂来创建。可以看一下拥塞控制器工厂接口CCCVirtualFactory及默认的拥塞控制器工厂实现CCCFactory的定义:
class CCCVirtualFactory { public: virtual ~CCCVirtualFactory() { } virtual CCC* create() = 0; virtual CCCVirtualFactory* clone() = 0; }; template<class T> class CCCFactory : public CCCVirtualFactory { public: virtual ~CCCFactory() { } virtual CCC* create() { return new T; } virtual CCCVirtualFactory* clone() { return new CCCFactory<T> ; } };
CCCFactory主要会被用于创建默认拥塞控制器CUDTCC(src/core.cpp):
CUDT::CUDT() { ...... m_pCCFactory = new CCCFactory<CUDTCC>;
要实现自定义的拥塞控制策略,也是需要实现自己的拥塞控制器虚拟工厂接口CCCVirtualFactory,令自定义的拥塞控制器虚拟工厂创建自定义的拥塞控制器,并将自定义的拥塞控制器虚拟工厂设置给UDT。在CUDT::setOpt()中可见:
case UDT_CC: if (m_bConnecting || m_bConnected) throw CUDTException(5, 1, 0); if (NULL != m_pCCFactory) delete m_pCCFactory; m_pCCFactory = ((CCCVirtualFactory *) optval)->clone(); break;
顺便提一下,在CUDT::getOpt()中获取UDT_CC选项的值则是直接获取的拥塞控制器而不是拥塞控制器虚拟工厂:
void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen) { CGuard cg(m_ConnectionLock); 。。。。。。 case UDT_CC: if (!m_bOpened) throw CUDTException(5, 5, 0); *(CCC**) optval = m_pCC; optlen = sizeof(CCC*); break;
UDT中拥塞控制器虚拟工厂的内容基本上就是这些了。
拥塞控制器CCC
这里再来看一下拥塞控制CCC的定义(src/ccc.h):
class UDT_API CCC { friend class CUDT; public: CCC(); virtual ~CCC(); private: CCC(const CCC&); CCC& operator=(const CCC&) { return *this; } public: // Functionality: // Callback function to be called (only) at the start of a UDT connection. // note that this is different from CCC(), which is always called. // Parameters: // None. // Returned value: // None. virtual void init() { } // Functionality: // Callback function to be called when a UDT connection is closed. // Parameters: // None. // Returned value: // None. virtual void close() { } // Functionality: // Callback function to be called when an ACK packet is received. // Parameters: // 0) [in] ackno: the data sequence number acknowledged by this ACK. // Returned value: // None. virtual void onACK(int32_t) { } // Functionality: // Callback function to be called when a loss report is received. // Parameters: // 0) [in] losslist: list of sequence number of packets, in the format describled in packet.cpp. // 1) [in] size: length of the loss list. // Returned value: // None. virtual void onLoss(const int32_t*, int) { } // Functionality: // Callback function to be called when a timeout event occurs. // Parameters: // None. // Returned value: // None. virtual void onTimeout() { } // Functionality: // Callback function to be called when a data is sent. // Parameters: // 0) [in] seqno: the data sequence number. // 1) [in] size: the payload size. // Returned value: // None. virtual void onPktSent(const CPacket*) { } // Functionality: // Callback function to be called when a data is received. // Parameters: // 0) [in] seqno: the data sequence number. // 1) [in] size: the payload size. // Returned value: // None. virtual void onPktReceived(const CPacket*) { } // Functionality: // Callback function to Process a user defined packet. // Parameters: // 0) [in] pkt: the user defined packet. // Returned value: // None. virtual void processCustomMsg(const CPacket*) { } protected: // Functionality: // Set periodical acknowldging and the ACK period. // Parameters: // 0) [in] msINT: the period to send an ACK. // Returned value: // None. void setACKTimer(int msINT); // Functionality: // Set packet-based acknowldging and the number of packets to send an ACK. // Parameters: // 0) [in] pktINT: the number of packets to send an ACK. // Returned value: // None. void setACKInterval(int pktINT); // Functionality: // Set RTO value. // Parameters: // 0) [in] msRTO: RTO in macroseconds. // Returned value: // None. void setRTO(int usRTO); // Functionality: // Send a user defined control packet. // Parameters: // 0) [in] pkt: user defined packet. // Returned value: // None. void sendCustomMsg(CPacket& pkt) const; // Functionality: // retrieve performance information. // Parameters: // None. // Returned value: // Pointer to a performance info structure. const CPerfMon* getPerfInfo(); // Functionality: // Set user defined parameters. // Parameters: // 0) [in] param: the paramters in one buffer. // 1) [in] size: the size of the buffer. // Returned value: // None. void setUserParam(const char* param, int size); private: void setMSS(int mss); void setMaxCWndSize(int cwnd); void setBandwidth(int bw); void setSndCurrSeqNo(int32_t seqno); void setRcvRate(int rcvrate); void setRTT(int rtt); protected: const int32_t& m_iSYNInterval; // UDT constant parameter, SYN double m_dPktSndPeriod; // Packet sending period, in microseconds double m_dCWndSize; // Congestion window size, in packets int m_iBandwidth; // estimated bandwidth, packets per second double m_dMaxCWndSize; // maximum cwnd size, in packets int m_iMSS; // Maximum Packet Size, including all packet headers int32_t m_iSndCurrSeqNo; // current maximum seq no sent out int m_iRcvRate; // packet arrive rate at receiver side, packets per second int m_iRTT; // current estimated RTT, microsecond char* m_pcParam; // user defined parameter int m_iPSize; // size of m_pcParam private: UDTSOCKET m_UDT; // The UDT entity that this congestion control algorithm is bound to int m_iACKPeriod; // Periodical timer to send an ACK, in milliseconds int m_iACKInterval; // How many packets to send one ACK, in packets bool m_bUserDefinedRTO; // if the RTO value is defined by users int m_iRTO; // RTO value, microseconds CPerfMon m_PerfInfo; // protocol statistics information };
然后是CCC的实现(src/ccc.cpp):
CCC::CCC() : m_iSYNInterval(CUDT::m_iSYNInterval), m_dPktSndPeriod(1.0), m_dCWndSize(16.0), m_iBandwidth(), m_dMaxCWndSize(), m_iMSS(), m_iSndCurrSeqNo(), m_iRcvRate(), m_iRTT(), m_pcParam(NULL), m_iPSize(0), m_UDT(), m_iACKPeriod(0), m_iACKInterval(0), m_bUserDefinedRTO(false), m_iRTO(-1), m_PerfInfo() { } CCC::~CCC() { delete[] m_pcParam; } void CCC::setACKTimer(int msINT) { m_iACKPeriod = msINT > m_iSYNInterval ? m_iSYNInterval : msINT; } void CCC::setACKInterval(int pktINT) { m_iACKInterval = pktINT; } void CCC::setRTO(int usRTO) { m_bUserDefinedRTO = true; m_iRTO = usRTO; } void CCC::sendCustomMsg(CPacket& pkt) const { CUDT* u = CUDT::getUDTHandle(m_UDT); if (NULL != u) { pkt.m_iID = u->m_PeerID; u->m_pSndQueue->sendto(u->m_pPeerAddr, pkt); } } const CPerfMon* CCC::getPerfInfo() { try { CUDT* u = CUDT::getUDTHandle(m_UDT); if (NULL != u) u->sample(&m_PerfInfo, false); } catch (...) { return NULL; } return &m_PerfInfo; } void CCC::setMSS(int mss) { m_iMSS = mss; } void CCC::setBandwidth(int bw) { m_iBandwidth = bw; } void CCC::setSndCurrSeqNo(int32_t seqno) { m_iSndCurrSeqNo = seqno; } void CCC::setRcvRate(int rcvrate) { m_iRcvRate = rcvrate; } void CCC::setMaxCWndSize(int cwnd) { m_dMaxCWndSize = cwnd; } void CCC::setRTT(int rtt) { m_iRTT = rtt; } void CCC::setUserParam(const char* param, int size) { delete[] m_pcParam; m_pcParam = new char[size]; memcpy(m_pcParam, param, size); m_iPSize = size; }
然后是UDT中默认的CCC CUDTCC定义:
class CUDTCC : public CCC { public: CUDTCC(); public: virtual void init(); virtual void onACK(int32_t); virtual void onLoss(const int32_t*, int); virtual void onTimeout(); private: int m_iRCInterval; // UDT Rate control interval uint64_t m_LastRCTime; // last rate increase time bool m_bSlowStart; // if in slow start phase int32_t m_iLastAck; // last ACKed seq no bool m_bLoss; // if loss happened since last rate increase int32_t m_iLastDecSeq; // max pkt seq no sent out when last decrease happened double m_dLastDecPeriod; // value of pktsndperiod when last decrease happened int m_iNAKCount; // NAK counter int m_iDecRandom; // random threshold on decrease by number of loss events int m_iAvgNAKNum; // average number of NAKs per congestion int m_iDecCount; // number of decreases in a congestion epoch };
然后是UDT中默认的CCC CUDTCC的实现:
CUDTCC::CUDTCC() : m_iRCInterval(), m_LastRCTime(), m_bSlowStart(), m_iLastAck(), m_bLoss(), m_iLastDecSeq(), m_dLastDecPeriod(), m_iNAKCount(), m_iDecRandom(), m_iAvgNAKNum(), m_iDecCount() { } void CUDTCC::init() { m_iRCInterval = m_iSYNInterval; m_LastRCTime = CTimer::getTime(); setACKTimer(m_iRCInterval); m_bSlowStart = true; m_iLastAck = m_iSndCurrSeqNo; m_bLoss = false; m_iLastDecSeq = CSeqNo::decseq(m_iLastAck); m_dLastDecPeriod = 1; m_iAvgNAKNum = 0; m_iNAKCount = 0; m_iDecRandom = 1; m_dCWndSize = 16; m_dPktSndPeriod = 1; } void CUDTCC::onACK(int32_t ack) { int64_t B = 0; double inc = 0; // Note: 1/24/2012 // The minimum increase parameter is increased from "1.0 / m_iMSS" to 0.01 // because the original was too small and caused sending rate to stay at low level // for long time. const double min_inc = 0.01; uint64_t currtime = CTimer::getTime(); if (currtime - m_LastRCTime < (uint64_t) m_iRCInterval) return; m_LastRCTime = currtime; if (m_bSlowStart) { m_dCWndSize += CSeqNo::seqlen(m_iLastAck, ack); m_iLastAck = ack; if (m_dCWndSize > m_dMaxCWndSize) { m_bSlowStart = false; if (m_iRcvRate > 0) m_dPktSndPeriod = 1000000.0 / m_iRcvRate; else m_dPktSndPeriod = (m_iRTT + m_iRCInterval) / m_dCWndSize; } } else m_dCWndSize = m_iRcvRate / 1000000.0 * (m_iRTT + m_iRCInterval) + 16; // During Slow Start, no rate increase if (m_bSlowStart) return; if (m_bLoss) { m_bLoss = false; return; } B = (int64_t) (m_iBandwidth - 1000000.0 / m_dPktSndPeriod); if ((m_dPktSndPeriod > m_dLastDecPeriod) && ((m_iBandwidth / 9) < B)) B = m_iBandwidth / 9; if (B <= 0) inc = min_inc; else { // inc = max(10 ^ ceil(log10( B * MSS * 8 ) * Beta / MSS, 1/MSS) // Beta = 1.5 * 10^(-6) inc = pow(10.0, ceil(log10(B * m_iMSS * 8.0))) * 0.0000015 / m_iMSS; if (inc < min_inc) inc = min_inc; } m_dPktSndPeriod = (m_dPktSndPeriod * m_iRCInterval) / (m_dPktSndPeriod * inc + m_iRCInterval); } void CUDTCC::onLoss(const int32_t* losslist, int) { //Slow Start stopped, if it hasn‘t yet if (m_bSlowStart) { m_bSlowStart = false; if (m_iRcvRate > 0) { // Set the sending rate to the receiving rate. m_dPktSndPeriod = 1000000.0 / m_iRcvRate; return; } // If no receiving rate is observed, we have to compute the sending // rate according to the current window size, and decrease it // using the method below. m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval); } m_bLoss = true; if (CSeqNo::seqcmp(losslist[0] & 0x7FFFFFFF, m_iLastDecSeq) > 0) { m_dLastDecPeriod = m_dPktSndPeriod; m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125); m_iAvgNAKNum = (int) ceil(m_iAvgNAKNum * 0.875 + m_iNAKCount * 0.125); m_iNAKCount = 1; m_iDecCount = 1; m_iLastDecSeq = m_iSndCurrSeqNo; // remove global synchronization using randomization srand(m_iLastDecSeq); m_iDecRandom = (int) ceil(m_iAvgNAKNum * (double(rand()) / RAND_MAX)); if (m_iDecRandom < 1) m_iDecRandom = 1; } else if ((m_iDecCount++ < 5) && (0 == (++m_iNAKCount % m_iDecRandom))) { // 0.875^5 = 0.51, rate should not be decreased by more than half within a congestion period m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125); m_iLastDecSeq = m_iSndCurrSeqNo; } } void CUDTCC::onTimeout() { if (m_bSlowStart) { m_bSlowStart = false; if (m_iRcvRate > 0) m_dPktSndPeriod = 1000000.0 / m_iRcvRate; else m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval); } else { /* m_dLastDecPeriod = m_dPktSndPeriod; m_dPktSndPeriod = ceil(m_dPktSndPeriod * 2); m_iLastDecSeq = m_iLastAck; */ } }
具体的算法这里就不再仔细厘清了。
拥塞控制器就像是一个加工长一样,接收一些事件,经过自己的处理,输出一些数据来控制数据的收发过程。拥塞控制器输出的数据主要有:
1. 包发送周期m_dPktSndPeriod用于控制数据包的发送周期。
2. 拥塞控制窗口大小m_dCWndSize用于控制发送窗口的大小。
3. ACK发送周期m_iACKPeriod和ACK发送间隔m_iACKInterval用于控制ACK包发送的频率。
4. m_bUserDefinedRTO和m_iRTO用于控制超时时间。
如我们前面看到的m_dPktSndPeriod和m_dCWndSize会在CUDT::CCUpdate()中同步给CUDT。
Done。