UDT协议实现分析——发送窗口大小及发送速率的调整

UDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用,并使网络中数据传输的速率尽可能快。

如我们前面在分析数据发送的控制中看到的,对于正常的顺序packet发送,发送控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由拥塞窗口大小m_dCongestionWindow和滑动窗口大小m_iFlowWindowSize来描述,发送窗口大小为两者中较小的那一个;二是控制两个数据包发送的时间间隔,也就是包的发送速率,这一点则主要用数据包发送时间间隔m_ullInterval来描述。所有的发送控制机制主要通过影响这几个变量来控制发送过程。

发送窗口大小对于数据packet发送过程的影响比较直接,在CUDT::packData(CPacket& packet, uint64_t& ts)中,会检查最后被ACK的数据packet的SeqNo m_iSndLastAck到最近发送的数据packet的SeqNo m_iSndCurrSeqNo的offset,若offset大于窗口大小,就不再发送数据。

发送送率的控制则略微复杂一点。在CUDT::packData(CPacket& packet, uint64_t& ts)中会计算下一个数据packet发送的理想的时间点,记录在m_ullTargetTime中用于track及调整后续数据packet的发送时间,并会将该时间值返回给调用者CSndUList::pop(),CSndUList::pop()则会在将CUDT重新插入发送列表时更新CUDT的CSNode m_pSNode的时间戳字段,并根据新的时间戳来讲CUDT放在CSndUList的CUDT堆的适当位置上。

这里就来更细致地看一下UDT中发送窗口大小及发送速率的调整。

滑动窗口大小m_iFlowWindowSize

对于m_iFlowWindowSize,搜遍UDT的整个code,可以看到,主要在这样的几个地方会去更新它:

1. 数据接收端和数据发送端在建立连接的过程中会通过HandShake消息协商确认该值。

CUDT::connect(const CPacket& response):

m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;

和CUDT::connect(const sockaddr* peer, CHandShake* hs):

// exchange info for maximum flow window size
    m_iFlowWindowSize = hs->m_iFlightFlagSize;

CUDT::connect(const sockaddr* serv_addr)中连接握手请求m_ConnReq的m_iFlightFlagSize:

    m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;

在CUDT::CUDT()中会初始化m_iFlightFlagSize和m_iRcvBufSize:

    m_iFlightFlagSize = 25600;
    m_iSndBufSize = 8192;
    m_iRcvBufSize = 8192;  //Rcv buffer MUST NOT be bigger than Flight Flag size

m_iFlightFlagSize和m_iRcvBufSize这两个选项还可以通过UDT::setsockopt()进行设置(src/core.cpp):

void CUDT::setOpt(UDTOpt optName, const void* optval, int) {
    if (m_bBroken || m_bClosing)
        throw CUDTException(2, 1, 0);

    CGuard cg(m_ConnectionLock);
    CGuard sendguard(m_SendLock);
    CGuard recvguard(m_RecvLock);

    switch (optName) {

        case UDT_FC:
            if (m_bConnecting || m_bConnected)
                throw CUDTException(5, 2, 0);

            if (*(int*) optval < 1)
                throw CUDTException(5, 3);

            // Mimimum recv flight flag size is 32 packets
            if (*(int*) optval > 32)
                m_iFlightFlagSize = *(int*) optval;
            else
                m_iFlightFlagSize = 32;

            break;

        case UDT_RCVBUF:
            if (m_bOpened)
                throw CUDTException(5, 1, 0);

            if (*(int*) optval <= 0)
                throw CUDTException(5, 3, 0);

            // Mimimum recv buffer size is 32 packets
            if (*(int*) optval > (m_iMSS - 28) * 32)
                m_iRcvBufSize = *(int*) optval / (m_iMSS - 28);
            else
                m_iRcvBufSize = 32;

            // recv buffer MUST not be greater than FC size
            if (m_iRcvBufSize > m_iFlightFlagSize)
                m_iRcvBufSize = m_iFlightFlagSize;

            break;

2. 数据接收端发送的“light” ACK消息减小m_iFlowWindowSize。

如CUDT::processCtrl(CPacket& ctrlpkt)中这样的一段code:

            // process a lite ACK
            if (4 == ctrlpkt.getLength()) {
                ack = *(int32_t *) ctrlpkt.m_pcData;
                if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) {
                    m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack);
                    m_iSndLastAck = ack;
                }

                break;
            }

如我们前面在 UDT数据收发的可靠性保障 的ACK部分看到的那样,“light” ACK消息通常是由于在某一小段时间内突然到达了大量的数据packet,才会发送的,这通常表明发送端发送数据过快过多了,因而“light” ACK消息会减小数据发送端的滑动窗口大小m_iFlowWindowSize。

3. 数据接收端发送的常规ACK消息直接设置发送端的滑动窗口大小m_iFlowWindowSize。

如CUDT::processCtrl(CPacket& ctrlpkt)中这样的一段code:

            if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) {
                // Update Flow Window Size, must update before and together with m_iSndLastAck
                m_iFlowWindowSize = *((int32_t *) ctrlpkt.m_pcData + 3);
                m_iSndLastAck = ack;
            }

而数据接收端则主要根据它自己的接收缓冲区的可用大小来设置,如CUDT::sendCtrl()中的这段code:

                data[3] = m_pRcvBuffer->getAvailBufSize();
                // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
                if (data[3] < 2)
                    data[3] = 2;

                if (currt

更新m_iFlowWindowSize的地方基本上就是这3个。

m_iFlowWindowSize除了会在CUDT::packData()中被用来决定发送窗口的大小之外,它的初始值还决定着发送丢失列表的大小,如CUDT::connect()中:

        // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
        m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
        m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);

及拥塞控制器最大的拥塞窗口的大小,如CUDT::connect()中:

    m_pCC->setMaxCWndSize(m_iFlowWindowSize);

UDT中与滑动窗口大小相关的内容基本上就是这些。

数据包发送时间间隔m_ullInterval

对于m_ullInterval,在UDT中,则有如下的的几个地方会去更新它:

1. 数据接收端和数据发送端在建立连接过程中的CUDT::connect(const CPacket& response)和CUDT::connect(const sockaddr* peer, CHandShake* hs)中会根据拥塞控制器的m_dPktSndPeriod的初始值计算该值:

    m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);

UDT默认的拥塞控制器是将m_dPktSndPeriod初始化为1.0的(src/ccc.cpp):

CCC::CCC()
        : m_iSYNInterval(CUDT::m_iSYNInterval),
          m_dPktSndPeriod(1.0),
......

void CUDTCC::init() {
......

    m_dCWndSize = 16;
    m_dPktSndPeriod = 1;
}

2. 将拥塞控制器中的计算成果同步进CUDT中时会更新m_ullInterval。可以看下CUDT::CCUpdate():

void CUDT::CCUpdate() {
    m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
    m_dCongestionWindow = m_pCC->m_dCWndSize;

    if (m_llMaxBW <= 0)
        return;
    const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;
    if (m_ullInterval < minSP)
        m_ullInterval = minSP;
}

这里同样是根据拥塞控制器的m_dPktSndPeriod值计算该值,计算方法也与前一种相同。但m_ullInterval的实际取值会受限与最大数据传输率m_llMaxBW的值。最大数据传输率m_llMaxBW默认为无效值-1,CUDT::CUDT()中:

    m_llMaxBW = -1;

但可以通过UDT::setsockopt()进行设置(src/core.cpp):

        case UDT_MAXBW:
            m_llMaxBW = *(int64_t*) optval;
            break;

3. 数据接收端反馈的DelayWarning控制消息增加m_ullInterval。在CUDT::processCtrl()中可以看到对于DelayWarning消息的处理:

        case 4:  //100 - Delay Warning
            // One way packet delay is increasing, so decrease the sending rate
            m_ullInterval = (uint64_t) ceil(m_ullInterval * 1.125);
            m_iLastDecSeq = m_iSndCurrSeqNo;

            break;

CUDT::sendCtrl()中发送DelayWarning的过程:

        case 4:  //100 - Congestion Warning
            ctrlpkt.pack(pkttype);
            ctrlpkt.m_iID = m_PeerID;
            m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);

            CTimer::rdtsc(m_ullLastWarningTime);

            break;

这个过程看上去蛮直接的。但在4.0版本的UDT中,DelayWarning/CongestiongWarning消息实际上不再被用到了。在CUDT::processCtrl()中处理ACK2的case中可以看到被注释掉的这行code:

            //if increasing delay detected...
            //   sendCtrl(4);

只能缅怀曾经在发挥作用的

DelayWarning
/CongestiongWarning消息
了。

如此看来,则数据包发送时间间隔m_ullInterval似乎总是由拥塞控制器的m_dPktSndPeriod在决定了。

拥塞控制器

UDT使用拥塞控制器来追踪网络数据传输过程中发生的事件,如接收到了ACK,发生了超时等,并根据这些事件产生的时机及其它的一些基本设置进行计算,用计算结果来控制ACK消息发送的频率,发送窗口的大小,及数据包的发送频率等。

拥塞控制器虚拟工厂CCCVirtualFactory

UDT的拥塞控制器机制中,不直接创建拥塞控制器,而是通过拥塞控制器工厂来创建。可以看一下拥塞控制器工厂接口CCCVirtualFactory及默认的拥塞控制器工厂实现CCCFactory的定义:

class CCCVirtualFactory {
 public:
    virtual ~CCCVirtualFactory() {
    }

    virtual CCC* create() = 0;
    virtual CCCVirtualFactory* clone() = 0;
};

template<class T>
class CCCFactory : public CCCVirtualFactory {
 public:
    virtual ~CCCFactory() {
    }

    virtual CCC* create() {
        return new T;
    }
    virtual CCCVirtualFactory* clone() {
        return new CCCFactory<T> ;
    }
};

CCCFactory主要会被用于创建默认拥塞控制器CUDTCC(src/core.cpp):

CUDT::CUDT() {
......
    m_pCCFactory = new CCCFactory<CUDTCC>;

要实现自定义的拥塞控制策略,也是需要实现自己的拥塞控制器虚拟工厂接口CCCVirtualFactory,令自定义的拥塞控制器虚拟工厂创建自定义的拥塞控制器,并将自定义的拥塞控制器虚拟工厂设置给UDT。在CUDT::setOpt()中可见:

        case UDT_CC:
            if (m_bConnecting || m_bConnected)
                throw CUDTException(5, 1, 0);
            if (NULL != m_pCCFactory)
                delete m_pCCFactory;
            m_pCCFactory = ((CCCVirtualFactory *) optval)->clone();

            break;

顺便提一下,在CUDT::getOpt()中获取UDT_CC选项的值则是直接获取的拥塞控制器而不是拥塞控制器虚拟工厂:

void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen) {
    CGuard cg(m_ConnectionLock);
。。。。。。

        case UDT_CC:
            if (!m_bOpened)
                throw CUDTException(5, 5, 0);
            *(CCC**) optval = m_pCC;
            optlen = sizeof(CCC*);

            break;

UDT中拥塞控制器虚拟工厂的内容基本上就是这些了。

拥塞控制器CCC

这里再来看一下拥塞控制CCC的定义(src/ccc.h):

class UDT_API CCC {
    friend class CUDT;

 public:
    CCC();
    virtual ~CCC();

 private:
    CCC(const CCC&);
    CCC& operator=(const CCC&) {
        return *this;
    }

 public:

    // Functionality:
    //    Callback function to be called (only) at the start of a UDT connection.
    //    note that this is different from CCC(), which is always called.
    // Parameters:
    //    None.
    // Returned value:
    //    None.

    virtual void init() {
    }

    // Functionality:
    //    Callback function to be called when a UDT connection is closed.
    // Parameters:
    //    None.
    // Returned value:
    //    None.

    virtual void close() {
    }

    // Functionality:
    //    Callback function to be called when an ACK packet is received.
    // Parameters:
    //    0) [in] ackno: the data sequence number acknowledged by this ACK.
    // Returned value:
    //    None.

    virtual void onACK(int32_t) {
    }

    // Functionality:
    //    Callback function to be called when a loss report is received.
    // Parameters:
    //    0) [in] losslist: list of sequence number of packets, in the format describled in packet.cpp.
    //    1) [in] size: length of the loss list.
    // Returned value:
    //    None.

    virtual void onLoss(const int32_t*, int) {
    }

    // Functionality:
    //    Callback function to be called when a timeout event occurs.
    // Parameters:
    //    None.
    // Returned value:
    //    None.

    virtual void onTimeout() {
    }

    // Functionality:
    //    Callback function to be called when a data is sent.
    // Parameters:
    //    0) [in] seqno: the data sequence number.
    //    1) [in] size: the payload size.
    // Returned value:
    //    None.

    virtual void onPktSent(const CPacket*) {
    }

    // Functionality:
    //    Callback function to be called when a data is received.
    // Parameters:
    //    0) [in] seqno: the data sequence number.
    //    1) [in] size: the payload size.
    // Returned value:
    //    None.

    virtual void onPktReceived(const CPacket*) {
    }

    // Functionality:
    //    Callback function to Process a user defined packet.
    // Parameters:
    //    0) [in] pkt: the user defined packet.
    // Returned value:
    //    None.

    virtual void processCustomMsg(const CPacket*) {
    }

 protected:

    // Functionality:
    //    Set periodical acknowldging and the ACK period.
    // Parameters:
    //    0) [in] msINT: the period to send an ACK.
    // Returned value:
    //    None.

    void setACKTimer(int msINT);

    // Functionality:
    //    Set packet-based acknowldging and the number of packets to send an ACK.
    // Parameters:
    //    0) [in] pktINT: the number of packets to send an ACK.
    // Returned value:
    //    None.

    void setACKInterval(int pktINT);

    // Functionality:
    //    Set RTO value.
    // Parameters:
    //    0) [in] msRTO: RTO in macroseconds.
    // Returned value:
    //    None.

    void setRTO(int usRTO);

    // Functionality:
    //    Send a user defined control packet.
    // Parameters:
    //    0) [in] pkt: user defined packet.
    // Returned value:
    //    None.

    void sendCustomMsg(CPacket& pkt) const;

    // Functionality:
    //    retrieve performance information.
    // Parameters:
    //    None.
    // Returned value:
    //    Pointer to a performance info structure.

    const CPerfMon* getPerfInfo();

    // Functionality:
    //    Set user defined parameters.
    // Parameters:
    //    0) [in] param: the paramters in one buffer.
    //    1) [in] size: the size of the buffer.
    // Returned value:
    //    None.

    void setUserParam(const char* param, int size);

 private:
    void setMSS(int mss);
    void setMaxCWndSize(int cwnd);
    void setBandwidth(int bw);
    void setSndCurrSeqNo(int32_t seqno);
    void setRcvRate(int rcvrate);
    void setRTT(int rtt);

 protected:
    const int32_t& m_iSYNInterval;	// UDT constant parameter, SYN

    double m_dPktSndPeriod;              // Packet sending period, in microseconds
    double m_dCWndSize;                  // Congestion window size, in packets

    int m_iBandwidth;			// estimated bandwidth, packets per second
    double m_dMaxCWndSize;               // maximum cwnd size, in packets

    int m_iMSS;				// Maximum Packet Size, including all packet headers
    int32_t m_iSndCurrSeqNo;		// current maximum seq no sent out
    int m_iRcvRate;			// packet arrive rate at receiver side, packets per second
    int m_iRTT;				// current estimated RTT, microsecond

    char* m_pcParam;			// user defined parameter
    int m_iPSize;			// size of m_pcParam

 private:
    UDTSOCKET m_UDT;                     // The UDT entity that this congestion control algorithm is bound to

    int m_iACKPeriod;                    // Periodical timer to send an ACK, in milliseconds
    int m_iACKInterval;                  // How many packets to send one ACK, in packets

    bool m_bUserDefinedRTO;              // if the RTO value is defined by users
    int m_iRTO;                          // RTO value, microseconds

    CPerfMon m_PerfInfo;                 // protocol statistics information
};

然后是CCC的实现(src/ccc.cpp):

CCC::CCC()
        : m_iSYNInterval(CUDT::m_iSYNInterval),
          m_dPktSndPeriod(1.0),
          m_dCWndSize(16.0),
          m_iBandwidth(),
          m_dMaxCWndSize(),
          m_iMSS(),
          m_iSndCurrSeqNo(),
          m_iRcvRate(),
          m_iRTT(),
          m_pcParam(NULL),
          m_iPSize(0),
          m_UDT(),
          m_iACKPeriod(0),
          m_iACKInterval(0),
          m_bUserDefinedRTO(false),
          m_iRTO(-1),
          m_PerfInfo() {
}

CCC::~CCC() {
    delete[] m_pcParam;
}

void CCC::setACKTimer(int msINT) {
    m_iACKPeriod = msINT > m_iSYNInterval ? m_iSYNInterval : msINT;
}

void CCC::setACKInterval(int pktINT) {
    m_iACKInterval = pktINT;
}

void CCC::setRTO(int usRTO) {
    m_bUserDefinedRTO = true;
    m_iRTO = usRTO;
}

void CCC::sendCustomMsg(CPacket& pkt) const {
    CUDT* u = CUDT::getUDTHandle(m_UDT);

    if (NULL != u) {
        pkt.m_iID = u->m_PeerID;
        u->m_pSndQueue->sendto(u->m_pPeerAddr, pkt);
    }
}

const CPerfMon* CCC::getPerfInfo() {
    try {
        CUDT* u = CUDT::getUDTHandle(m_UDT);
        if (NULL != u)
            u->sample(&m_PerfInfo, false);
    } catch (...) {
        return NULL;
    }

    return &m_PerfInfo;
}

void CCC::setMSS(int mss) {
    m_iMSS = mss;
}

void CCC::setBandwidth(int bw) {
    m_iBandwidth = bw;
}

void CCC::setSndCurrSeqNo(int32_t seqno) {
    m_iSndCurrSeqNo = seqno;
}

void CCC::setRcvRate(int rcvrate) {
    m_iRcvRate = rcvrate;
}

void CCC::setMaxCWndSize(int cwnd) {
    m_dMaxCWndSize = cwnd;
}

void CCC::setRTT(int rtt) {
    m_iRTT = rtt;
}

void CCC::setUserParam(const char* param, int size) {
    delete[] m_pcParam;
    m_pcParam = new char[size];
    memcpy(m_pcParam, param, size);
    m_iPSize = size;
}

然后是UDT中默认的CCC CUDTCC定义:

class CUDTCC : public CCC {
 public:
    CUDTCC();

 public:
    virtual void init();
    virtual void onACK(int32_t);
    virtual void onLoss(const int32_t*, int);
    virtual void onTimeout();

 private:
    int m_iRCInterval;			// UDT Rate control interval
    uint64_t m_LastRCTime;		// last rate increase time
    bool m_bSlowStart;			// if in slow start phase
    int32_t m_iLastAck;			// last ACKed seq no
    bool m_bLoss;			// if loss happened since last rate increase
    int32_t m_iLastDecSeq;		// max pkt seq no sent out when last decrease happened
    double m_dLastDecPeriod;		// value of pktsndperiod when last decrease happened
    int m_iNAKCount;                     // NAK counter
    int m_iDecRandom;                    // random threshold on decrease by number of loss events
    int m_iAvgNAKNum;                    // average number of NAKs per congestion
    int m_iDecCount;			// number of decreases in a congestion epoch
};

然后是UDT中默认的CCC CUDTCC的实现:

CUDTCC::CUDTCC()
        : m_iRCInterval(),
          m_LastRCTime(),
          m_bSlowStart(),
          m_iLastAck(),
          m_bLoss(),
          m_iLastDecSeq(),
          m_dLastDecPeriod(),
          m_iNAKCount(),
          m_iDecRandom(),
          m_iAvgNAKNum(),
          m_iDecCount() {
}

void CUDTCC::init() {
    m_iRCInterval = m_iSYNInterval;
    m_LastRCTime = CTimer::getTime();
    setACKTimer(m_iRCInterval);

    m_bSlowStart = true;
    m_iLastAck = m_iSndCurrSeqNo;
    m_bLoss = false;
    m_iLastDecSeq = CSeqNo::decseq(m_iLastAck);
    m_dLastDecPeriod = 1;
    m_iAvgNAKNum = 0;
    m_iNAKCount = 0;
    m_iDecRandom = 1;

    m_dCWndSize = 16;
    m_dPktSndPeriod = 1;
}

void CUDTCC::onACK(int32_t ack) {
    int64_t B = 0;
    double inc = 0;
    // Note: 1/24/2012
    // The minimum increase parameter is increased from "1.0 / m_iMSS" to 0.01
    // because the original was too small and caused sending rate to stay at low level
    // for long time.
    const double min_inc = 0.01;

    uint64_t currtime = CTimer::getTime();
    if (currtime - m_LastRCTime < (uint64_t) m_iRCInterval)
        return;

    m_LastRCTime = currtime;

    if (m_bSlowStart) {
        m_dCWndSize += CSeqNo::seqlen(m_iLastAck, ack);
        m_iLastAck = ack;

        if (m_dCWndSize > m_dMaxCWndSize) {
            m_bSlowStart = false;
            if (m_iRcvRate > 0)
                m_dPktSndPeriod = 1000000.0 / m_iRcvRate;
            else
                m_dPktSndPeriod = (m_iRTT + m_iRCInterval) / m_dCWndSize;
        }
    } else
        m_dCWndSize = m_iRcvRate / 1000000.0 * (m_iRTT + m_iRCInterval) + 16;

    // During Slow Start, no rate increase
    if (m_bSlowStart)
        return;

    if (m_bLoss) {
        m_bLoss = false;
        return;
    }

    B = (int64_t) (m_iBandwidth - 1000000.0 / m_dPktSndPeriod);
    if ((m_dPktSndPeriod > m_dLastDecPeriod) && ((m_iBandwidth / 9) < B))
        B = m_iBandwidth / 9;
    if (B <= 0)
        inc = min_inc;
    else {
        // inc = max(10 ^ ceil(log10( B * MSS * 8 ) * Beta / MSS, 1/MSS)
        // Beta = 1.5 * 10^(-6)

        inc = pow(10.0, ceil(log10(B * m_iMSS * 8.0))) * 0.0000015 / m_iMSS;

        if (inc < min_inc)
            inc = min_inc;
    }

    m_dPktSndPeriod = (m_dPktSndPeriod * m_iRCInterval) / (m_dPktSndPeriod * inc + m_iRCInterval);
}

void CUDTCC::onLoss(const int32_t* losslist, int) {
    //Slow Start stopped, if it hasn‘t yet
    if (m_bSlowStart) {
        m_bSlowStart = false;
        if (m_iRcvRate > 0) {
            // Set the sending rate to the receiving rate.
            m_dPktSndPeriod = 1000000.0 / m_iRcvRate;
            return;
        }
        // If no receiving rate is observed, we have to compute the sending
        // rate according to the current window size, and decrease it
        // using the method below.
        m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval);
    }

    m_bLoss = true;

    if (CSeqNo::seqcmp(losslist[0] & 0x7FFFFFFF, m_iLastDecSeq) > 0) {
        m_dLastDecPeriod = m_dPktSndPeriod;
        m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125);

        m_iAvgNAKNum = (int) ceil(m_iAvgNAKNum * 0.875 + m_iNAKCount * 0.125);
        m_iNAKCount = 1;
        m_iDecCount = 1;

        m_iLastDecSeq = m_iSndCurrSeqNo;

        // remove global synchronization using randomization
        srand(m_iLastDecSeq);
        m_iDecRandom = (int) ceil(m_iAvgNAKNum * (double(rand()) / RAND_MAX));
        if (m_iDecRandom < 1)
            m_iDecRandom = 1;
    } else if ((m_iDecCount++ < 5) && (0 == (++m_iNAKCount % m_iDecRandom))) {
        // 0.875^5 = 0.51, rate should not be decreased by more than half within a congestion period
        m_dPktSndPeriod = ceil(m_dPktSndPeriod * 1.125);
        m_iLastDecSeq = m_iSndCurrSeqNo;
    }
}

void CUDTCC::onTimeout() {
    if (m_bSlowStart) {
        m_bSlowStart = false;
        if (m_iRcvRate > 0)
            m_dPktSndPeriod = 1000000.0 / m_iRcvRate;
        else
            m_dPktSndPeriod = m_dCWndSize / (m_iRTT + m_iRCInterval);
    } else {
        /*
         m_dLastDecPeriod = m_dPktSndPeriod;
         m_dPktSndPeriod = ceil(m_dPktSndPeriod * 2);
         m_iLastDecSeq = m_iLastAck;
         */
    }
}

具体的算法这里就不再仔细厘清了。

拥塞控制器就像是一个加工长一样,接收一些事件,经过自己的处理,输出一些数据来控制数据的收发过程。拥塞控制器输出的数据主要有:

1. 包发送周期m_dPktSndPeriod用于控制数据包的发送周期。

2. 拥塞控制窗口大小m_dCWndSize用于控制发送窗口的大小。

3. ACK发送周期m_iACKPeriod和ACK发送间隔m_iACKInterval用于控制ACK包发送的频率。

4. m_bUserDefinedRTO和m_iRTO用于控制超时时间。

如我们前面看到的m_dPktSndPeriod和m_dCWndSize会在CUDT::CCUpdate()中同步给CUDT。

Done。

时间: 2024-10-29 19:09:39

UDT协议实现分析——发送窗口大小及发送速率的调整的相关文章

UDT协议实现分析——数据的发送

连接建立起来之后,我们就可以通过UDT Socket进行数据的收发了.先来看用来发送数据的几个函数.UDT提供了如下的几个函数用于不同目的下的数据发送: UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); UDT_API i

UDT协议实现分析——数据的接收

看了UDT中数据发送的部分之后,我们转换一个角度,来看一下接收端发生的故事. 如我们前面在 UDT协议实现分析--连接的建立 一文中看到的那样,CUDT在connect()的后半场,会通过调用CRcvQueue::removeConnector()把它自己从它的CChannel的接收队列CRcvQueue的m_pRendezvousQueue队列中移除出去,以表示连接已成功建立,后面不再通过m_pRendezvousQueue接收连接相关消息,并通过调用CRcvQueue::setNewEntr

UDT协议实现分析——UDT Socket的创建

UDT API的用法 在分析 连接的建立过程 之前,先来看一下UDT API的用法.在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接:有一个或多个客户端连接UDT Server:UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信. 先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下. #include <unistd.h>

UDT协议实现分析——连接的建立

UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了.这里我们研究一下UDT连接建立的过程. 连接的发起 来看连接的发起方.如前面我们看到的那样,UDT Client创建一个Socket,可以将该Socket绑定到某个端口,也可以不绑定,然后就可以调用UDT::connect()将这个Socket连接到UDT Server了.来看UDT::connect()的定义(src/api.cpp): int CUDTUnited::connect(const UDTS

UDT协议实现分析——数据发送控制

在前文中,我们有看到,数据发送的过程,大体是发送者CUDT将要发送的数据放进它的CSndBuffer m_pSndBuffer,并将它自己添加进它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆里,后面CSndQueue m_pSndQueue的worker线程会通过CSndUList::pop()从CSndUList m_pSndUList的堆顶CUDT中获取一个要发送的包来发送,包的获取主要是通过CUDT::packData()来完成,而这个函数正

UDT协议实现分析——UDT数据收发的可靠性保障

不管是数据的发送还是数据的接收,大体的流程我们基本上是都理了一下,还分析了数据收发过程中用的数据结构,接下来就看一些UDT中数据收发更精细的一些控制. UDT数据收发的可靠性保障 来看一下UDT中数据收发的可靠性保障. 接收包丢失列表CRcvLossList 先来看一下CRcvLossList的定义: class CRcvLossList { public: CRcvLossList(int size = 1024); ~CRcvLossList(); // Functionality: //

UDT协议实现分析——UDT初始化和销毁

UDT协议是一个用于在告诉Internet上传输大量数据的基于UDP的可靠传输协议. 我们可以将UDT协议的实现看作一个比较复杂的状态机.更准确的说,是一个主状态机,外加多个子状态机.主状态机是指协议实现中全局唯一.全局共享的状态与数据结构,主要对应于CUDTUnited类.子状态机则是对于一次UDT连接或一个Listening的UDT Server的抽象,是UDT自己创建的Socket抽象,一个与系统socket相似但又不同的概念,主要对应于CUDTSocket和CUDT类.UDT的Socke

UDT协议实现分析——bind、listen与accept

UDT Server启动之后,基于UDT协议的UDP数据可靠传输才成为可能,因而接下来分析与UDT Server有关的几个主要API的实现,来了解下UDT Server是如何listening在特定UDP端口上的.主要有UDT::bind(),UDT::listen()和UDT::accept()等几个函数. bind过程 通常UDT Server在创建UDT Socket之后,首先就要调用UDT::bind(),与一个特定的本地UDP端口地址进行绑定,以便可以在希望的端口上监听.这里来看一下U

UDT协议实现分析总结

UDT的整体结构 UDT Socket是UDT中的核心,同时它也是一座桥梁,它将UDT的使用者应用程序与内部实现部分对于数据结构的管理.网络数据的传输连接起来. 应用程序通过它将数据放进发送缓冲待发送,或者借由它来获取从网络接收数据.而与网络进行交互的部分,则从它那里拿到要发送的数据进行发送,或者在收到packet时将packet dispatch给它. UDT的数据接收部分框架: UDT的数据发送部分框架: UDT的一些问题 1. 接口的合理性. 分析了UDT的这许多code,给人的感觉就是,