UDT协议实现分析——数据发送控制

前文中,我们有看到,数据发送的过程,大体是发送者CUDT将要发送的数据放进它的CSndBuffer m_pSndBuffer,并将它自己添加进它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆里,后面CSndQueue m_pSndQueue的worker线程会通过CSndUList::pop()从CSndUList m_pSndUList的堆顶CUDT中获取一个要发送的包来发送,包的获取主要是通过CUDT::packData()来完成,而这个函数正是UDT中包发送的执行中心。

CUDT::packData()

这里就来看一下CUDT::packData()的定义(src/core.cpp):

int CUDT::packData(CPacket& packet, uint64_t& ts) {
    int payload = 0;
    bool probe = false;

    uint64_t entertime;
    CTimer::rdtsc(entertime);

    if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
        m_ullTimeDiff += entertime - m_ullTargetTime;

    // Loss retransmission always has higher priority.
    if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) {
        // protect m_iSndLastDataAck from updating by ACK processing
        CGuard ackguard(m_AckLock);

        int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);
        if (offset < 0)
            return 0;

        int msglen;

        payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);

        if (-1 == payload) {
            int32_t seqpair[2];
            seqpair[0] = packet.m_iSeqNo;
            seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
            sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);

            // only one msg drop request is necessary
            m_pSndLossList->remove(seqpair[1]);

            // skip all dropped packets
            if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)
                m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);

            return 0;
        } else if (0 == payload)
            return 0;

        ++m_iTraceRetrans;
        ++m_iRetransTotal;
    } else {
        // If no loss, pack a new packet.

        // check congestion/flow window limit
        int cwnd = (m_iFlowWindowSize < (int) m_dCongestionWindow) ? m_iFlowWindowSize : (int) m_dCongestionWindow;
        if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo))) {
            if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) {
                m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
                m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);

                packet.m_iSeqNo = m_iSndCurrSeqNo;

                // every 16 (0xF) packets, a packet pair is sent
                if (0 == (packet.m_iSeqNo & 0xF))
                    probe = true;
            } else {
                m_ullTargetTime = 0;
                m_ullTimeDiff = 0;
                ts = 0;
                return 0;
            }
        } else {
            m_ullTargetTime = 0;
            m_ullTimeDiff = 0;
            ts = 0;
            return 0;
        }
    }

    packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
    packet.m_iID = m_PeerID;
    packet.setLength(payload);

    m_pCC->onPktSent(&packet);
    //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);

    ++m_llTraceSent;
    ++m_llSentTotal;

    if (probe) {
        // sends out probing packet pair
        ts = entertime;
        probe = false;
    } else {
#ifndef NO_BUSY_WAITING
        ts = entertime + m_ullInterval;
#else
        if (m_ullTimeDiff >= m_ullInterval) {
            ts = entertime;
            m_ullTimeDiff -= m_ullInterval;
        } else {
            ts = entertime + m_ullInterval - m_ullTimeDiff;
            m_ullTimeDiff = 0;
        }
#endif
    }

    m_ullTargetTime = ts;

    return payload;
}

在这个函数中,处理了两大类packet的读取,一是丢失的packet,二是正常的顺序传输的包。来看一下这个函数具体的执行过程:

1. 读取当前的时间entertime。

2. 更新m_ullTimeDiff。在UDT中,包发送会有一个随着网络状况调整的一个发送周期,也就是m_ullInterval值。在每一次发送包时,都会根据m_ullInterval值计算下一次包发送的理想时间,并记录在m_ullTargetTime中。而m_ullTimeDiff则被用来记录当前的这次包发送想对于理想的发送时间的延滞值,这个值会被用于计算下一次包发送的理想时间。UDT正是通过这样的修正来尽可能使的包发送周期能够保持在m_ullInterval值附近。

3. 从丢失包列表m_pSndLossList中获取一个丢失的包的SeqNo,并赋值给packet.m_iSeqNo。这个丢失包列表中的包可能是来源于Timer,比如一个包超过了正常时间还没有得到响应,也有可能来源于发送端发回的NACK消息,后面会在来研究这个问题。

4. 前一步中获取的SeqNo大于等于0,这表明存在丢失了需要重传的包,则读取丢失的包的内容:

(1). 计算丢失的包的SeqNo与SndLastDataAck的差值offset。

(2). 检查前一步计算出来的offset值,若小于0,表明发送窗口已经滑过了,则直接返回,否则继续执行。

(3). 根据前面计算的offset值,通过m_pSndBuffer->readData()把数据读入packet中。packet的m_iMsgNo会被更新为packet的MsgNo,msglen也会在packet过期时被更新。

(4). m_pSndBuffer->readData()返回0,表明读取的packet的数据长度为0。这样的packet没有实际发送的必要,直接返回,无需进行后续的步骤。

(5). m_pSndBuffer->readData()返回-1,表明要读取的packet已经过期,同样没有发送这个数据包本身的必要。

但此时会发送一个DropMsgRequest给数据接收端。

然后将过期的packet从SndLossList中移除出去。

m_iSndCurrSeqNo的值为最近一次发送的packet的SeqNo,这里还会在必要的时候更新m_iSndCurrSeqNo,以跳过所有被丢弃的packets。必要指的是,m_iSndCurrSeqNo的值小于等于要丢弃的这个Msg的最后一个packet的SeqNo。这也就意味着,要丢弃的这个Msg直到过期被丢弃都没有发完。

这个地方有一点比较奇怪,简化来看,seqpair[0] == packet.m_iSeqNo,seqpair[1] == seqpair[0] + msglen,也就是说seqpair[1]的值为要被丢弃Msg的最后一个packet的SeqNo加1,但在判断是否要更新m_iSndCurrSeqNo时,却是拿m_iSndCurrSeqNo和(seqpair[1] + 1),也就是要被丢弃的Msg的最后一个packet的SeqNo加2在比较,而更新也是被设置为这个值。但实际上,将m_iSndCurrSeqNo设置为被丢弃Msg的最后一个packet的SeqNo已经可以跳过整个Msg的发送了,因为下次要用m_iSndCurrSeqNo来获得SeqNo,会先将这个值加1的。这个地方的逻辑疑似存在bug。

返回0,向调用者表明暂时没有数据要发送。

(6). m_pSndBuffer->readData()返回大于0的值,表明有一个丢失的包需要重新发送,则更新m_iTraceRetrans和m_iRetransTotal,这两个值分别表示一次trace重发的总次数,和此UDT Socket总的重发次数,两者的区别在于前者在被读取之后会被重置为0(CUDT::sample()),而后者则不会。此时需要继续执行后面的第6步。

5. 在第3步中读取的SeqNo小于0,表明没有丢失的packet。此时则:

(1). 根据m_iFlowWindowSize和m_dCongestionWindow的值计算cwnd发送窗口的大小。发送窗口大小取这两个值中较小的那个,默认情况下,前者为8192(来自于Handshke消息的m_iFlightFlagSize字段,而m_iFlightFlagSize则根据m_iRcvBufSize和m_iFlightFlagSize得出),后者为16(来自于CC的m_dCWndSize字段,在CUDTCC::init()中该值被初始化为16。)。

(2). 检查发送窗口是否已满。若已满,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,然后返回0,向调用者表明没有数据要发送。否则继续执行。

m_iSndLastAck的值为下一次Ack应该确认的packet的SeqNo,CSeqNo::seqlen()计算的是包含两个端点在内的区间的长度。此处对CSeqNo::seqlen()的调用被用来计算,下个packet发送之后,发送窗口中所有的packet的个数。

(3). 读取下一个需要发送的packet,并检查返回的payload值。若payload值为0,表明数据缓冲区中所有的数据都已经发送了,无需再进行实际的发送,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,返回0,向调用者表明没有数据要发送。否则继续执行。

(4). 主要是更新m_iSndCurrSeqNo,并设置packet的SeqNo字段。如果SeqNo为16的整数倍,还会设置probe为true。

6. 设置packet的m_iTimeStamp,m_iID,及数据长度。

7. 更新m_llTraceSent和m_llSentTotal,其中前者表示CUDT这次Trace的过程中发送的总的packet数量,这个值会在CUDT::sample()获取trace数据之后被重置为0,而后者则表示发送的总共的packet数量,不会在CUDT::sample()获取trace数据之后被重置。

8. 根据probe的值,更新ts值等。配 合CUDT::packData()的调用者CSndUList::pop()一起看,可知ts是理想中该CUDT下次发送数据的时间点。

probe设置为true,就是表明,当前的这个packet被发送结束之后立即发送下一个packet。即使probe的值不为true,也有可能要立即发送下一个packet,比如延滞时间已经超过了理想的发生周期。存在延滞时间,但该延滞时间又没有超出理想的发送周期的,则下个packet的发送时间具体本次packet的发送时间会小于理想的packet发送周期。

总之这里是希望能够保持packet以接近理想的速率发送。

9. 更新m_ullTargetTime为ts。ts这个下次发包的理想时间点还需要m_ullTargetTime进行记录。

10. 返回payload值,也就是读取的packet的大小。

这里顺便来看下CSeqNo的设计与实现。这个类被用来帮助进行与SeqNo有关的一些计算(src/common.h):

class CSeqNo {
 public:
    inline static int seqcmp(int32_t seq1, int32_t seq2) {
        return (abs(seq1 - seq2) < m_iSeqNoTH) ? (seq1 - seq2) : (seq2 - seq1);
    }

    inline static int seqlen(int32_t seq1, int32_t seq2) {
        return (seq1 <= seq2) ? (seq2 - seq1 + 1) : (seq2 - seq1 + m_iMaxSeqNo + 2);
    }

    inline static int seqoff(int32_t seq1, int32_t seq2) {
        if (abs(seq1 - seq2) < m_iSeqNoTH)
            return seq2 - seq1;

        if (seq1 < seq2)
            return seq2 - seq1 - m_iMaxSeqNo - 1;

        return seq2 - seq1 + m_iMaxSeqNo + 1;
    }

    inline static int32_t incseq(int32_t seq) {
        return (seq == m_iMaxSeqNo) ? 0 : seq + 1;
    }

    inline static int32_t decseq(int32_t seq) {
        return (seq == 0) ? m_iMaxSeqNo : seq - 1;
    }

    inline static int32_t incseq(int32_t seq, int32_t inc) {
        return (m_iMaxSeqNo - seq >= inc) ? seq + inc : seq - m_iMaxSeqNo + inc - 1;
    }

 public:
    static const int32_t m_iSeqNoTH;             // threshold for comparing seq. no.
    static const int32_t m_iMaxSeqNo;            // maximum sequence number used in UDT
};

连接发起端在执行CUDT::connect(const sockaddr* serv_addr)时,会计算一个随机的值作为m_iISN,也即是发送的首个数据packet的SeqNo,而在连接建立过程中,这个值会被同步给Peer端的Socket。

这里可以看到,UDT Packet的SeqNo是[0, 0x7FFFFFFF]区间中的一个值。每发送一个packet,m_iSndCurrSeqNo都会被递增。通过class CSeqNo可以看到这个递增的规则,即SeqNo超出0x7FFFFFFF时会被归0。也正是由于0是一个合法的SeqNo,在incseq(int32_t seq, int32_t inc)中,SeqNo超出最大值时的计算里能看到有额外的减1,在seqlen()里,SeqNo超出最大值时的计算里能看到加2。

由seqcmp()和seqoff()这两个函数可见,同一时刻同时有效的两个SeqNo seq1和seq2之间的距离不能超过m_iSeqNoTH 0x3FFFFFFF,若超过则表明一定有一个SeqNo越过了最大值0x7FFFFFFF,也即较小的那个值越过了最大值。

这里还可以再来看一下CSndBuffer::readData():

int CSndBuffer::readData(char** data, int32_t& msgno) {
    // No data to read
    if (m_pCurrBlock == m_pLastBlock)
        return 0;

    *data = m_pCurrBlock->m_pcData;
    int readlen = m_pCurrBlock->m_iLength;
    msgno = m_pCurrBlock->m_iMsgNo;

    m_pCurrBlock = m_pCurrBlock->m_pNext;

    return readlen;
}

int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen) {
    CGuard bufferguard(m_BufLock);

    Block* p = m_pFirstBlock;

    for (int i = 0; i < offset; ++i)
        p = p->m_pNext;

    if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t) p->m_iTTL)) {
        msgno = p->m_iMsgNo & 0x1FFFFFFF;

        msglen = 1;
        p = p->m_pNext;
        bool move = false;
        while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) {
            if (p == m_pCurrBlock)
                move = true;
            p = p->m_pNext;
            if (move)
                m_pCurrBlock = p;
            msglen++;
        }

        return -1;
    }

    *data = p->m_pcData;
    int readlen = p->m_iLength;
    msgno = p->m_iMsgNo;

    return readlen;
}

CSndBuffer::readData(char** data, int32_t& msgno)读取当前的Block。基本上就是读取Block,然后将指向当前Block的指针m_pCurrBlock向后移一个Block。

而CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)则是读取距未响应的Block中最旧一块Block offset个单位的Block。在这个函数中,首先是移动到要读取的目标Block,如果要读取的Block已过期,则使m_pCurrBlock跳过该packet所属的Msg的所有Packet,然后返回-1退出。目标Block没有过期,则读取Block后返回数据长度。

总结在CUDT::packData()中对发送过程的控制。

丢失的包具有最高的发送优先级,这也是发送可靠性的保障方法。所有丢失的packet都会被放进SndLossList,这个List中的包可能来源于超时未得到响应,也可能来源于消息接收端发回的NACK。

对于正常的顺序packet发送的控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由m_dCongestionWindow和m_iFlowWindowSize来表示;二是控制两个包发送的时间间隔,也就是包的发送速率,这一点则主要用m_ullInterval来表示。所有的发送控制机制主要通过影响这几个变量来控制发送过程。

UDT数据发送的可靠性保障

接着就来看一下UDT中数据发送的可靠性保障。

SndLossList

先来看一下CSndLossList这个数据结构。这个Class的定义如下(src/list.h):

class CSndLossList {
 public:
    CSndLossList(int size = 1024);
    ~CSndLossList();

    // Functionality:
    //    Insert a seq. no. into the sender loss list.
    // Parameters:
    //    0) [in] seqno1: sequence number starts.
    //    1) [in] seqno2: sequence number ends.
    // Returned value:
    //    number of packets that are not in the list previously.
    int insert(int32_t seqno1, int32_t seqno2);

    // Functionality:
    //    Remove ALL the seq. no. that are not greater than the parameter.
    // Parameters:
    //    0) [in] seqno: sequence number.
    // Returned value:
    //    None.
    void remove(int32_t seqno);

    // Functionality:
    //    Read the loss length.
    // Parameters:
    //    None.
    // Returned value:
    //    The length of the list.
    int getLossLength();

    // Functionality:
    //    Read the first (smallest) loss seq. no. in the list and remove it.
    // Parameters:
    //    None.
    // Returned value:
    //    The seq. no. or -1 if the list is empty.
    int32_t getLostSeq();

 private:
    int32_t* m_piData1;                  // sequence number starts
    int32_t* m_piData2;                  // seqnence number ends
    int* m_piNext;                       // next node in the list

    int m_iHead;                         // first node
    int m_iLength;                       // loss length
    int m_iSize;                         // size of the static array
    int m_iLastInsertPos;                // position of last insert node

    pthread_mutex_t m_ListLock;          // used to synchronize list operation

 private:
    CSndLossList(const CSndLossList&);
    CSndLossList& operator=(const CSndLossList&);
};

这是一个不可复制容器。提供的接口不是很多,配合注释,都没有太多难以理解的地方。这是一个用数组实现的链表。接着来看这个class的构造和析构(src/list.cpp):

CSndLossList::CSndLossList(int size)
        : m_piData1(NULL),
          m_piData2(NULL),
          m_piNext(NULL),
          m_iHead(-1),
          m_iLength(0),
          m_iSize(size),
          m_iLastInsertPos(-1),
          m_ListLock() {
    m_piData1 = new int32_t[m_iSize];
    m_piData2 = new int32_t[m_iSize];
    m_piNext = new int[m_iSize];

    // -1 means there is no data in the node
    for (int i = 0; i < size; ++i) {
        m_piData1[i] = -1;
        m_piData2[i] = -1;
    }

    // sender list needs mutex protection
#ifndef WIN32
    pthread_mutex_init(&m_ListLock, 0);
#else
    m_ListLock = CreateMutex(NULL, false, NULL);
#endif
}

CSndLossList::~CSndLossList() {
    delete[] m_piData1;
    delete[] m_piData2;
    delete[] m_piNext;

#ifndef WIN32
    pthread_mutex_destroy(&m_ListLock);
#else
    CloseHandle(m_ListLock);
#endif
}

CUDT::connect()中创建CSndLossList时,size值为m_iFlowWindowSize * 2,也即8192 × 2 == 16384。如果不用数组,而用常规一点的方法来实现的话,链表的节点定义可能是这样的:

struct Node {
    int32_t m_iStart;
    int32_t m_iEnd;
    Node *m_pNext;
};

然后来看这个class最关键的函数之一CSndLossList::insert()的定义:

int CSndLossList::insert(int32_t seqno1, int32_t seqno2) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength) {
        // insert data into an empty list

        m_iHead = 0;
        m_piData1[m_iHead] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[m_iHead] = seqno2;

        m_piNext[m_iHead] = -1;
        m_iLastInsertPos = m_iHead;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);

        return m_iLength;
    }

    // otherwise find the position where the data can be inserted
    int origlen = m_iLength;
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno1);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (offset < 0) {
        // Insert data prior to the head pointer

        m_piData1[loc] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[loc] = seqno2;

        // new node becomes head
        m_piNext[loc] = m_iHead;
        m_iHead = loc;
        m_iLastInsertPos = loc;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);
    } else if (offset > 0) {
        if (seqno1 == m_piData1[loc]) {
            m_iLastInsertPos = loc;

            // first seqno is equivlent, compare the second
            if (-1 == m_piData2[loc]) {
                if (seqno2 != seqno1) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                }
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                // Do nothing if it is already there
                return 0;
        } else {
            // searching the prior node
            int i;
            if ((-1 != m_iLastInsertPos) && (CSeqNo::seqcmp(m_piData1[m_iLastInsertPos], seqno1) < 0))
                i = m_iLastInsertPos;
            else
                i = m_iHead;

            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno1) < 0))
                i = m_piNext[i];

            if ((-1 == m_piData2[i]) || (CSeqNo::seqcmp(m_piData2[i], seqno1) < 0)) {
                m_iLastInsertPos = loc;

                // no overlap, create new node
                m_piData1[loc] = seqno1;
                if (seqno2 != seqno1)
                    m_piData2[loc] = seqno2;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iLength += CSeqNo::seqlen(seqno1, seqno2);
            } else {
                m_iLastInsertPos = i;

                // overlap, coalesce with prior node, insert(3, 7) to [2, 5], ... becomes [2, 7]
                if (CSeqNo::seqcmp(m_piData2[i], seqno2) < 0) {
                    m_iLength += CSeqNo::seqlen(m_piData2[i], seqno2) - 1;
                    m_piData2[i] = seqno2;

                    loc = i;
                } else
                    return 0;
            }
        }
    } else {
        m_iLastInsertPos = m_iHead;

        // insert to head node
        if (seqno2 != seqno1) {
            if (-1 == m_piData2[loc]) {
                m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                return 0;
        } else
            return 0;
    }

    // coalesce with next node. E.g., [3, 7], ..., [6, 9] becomes [3, 9]
    while ((-1 != m_piNext[loc]) && (-1 != m_piData2[loc])) {
        int i = m_piNext[loc];

        if (CSeqNo::seqcmp(m_piData1[i], CSeqNo::incseq(m_piData2[loc])) <= 0) {
            // coalesce if there is overlap
            if (-1 != m_piData2[i]) {
                if (CSeqNo::seqcmp(m_piData2[i], m_piData2[loc]) > 0) {
                    if (CSeqNo::seqcmp(m_piData2[loc], m_piData1[i]) >= 0)
                        m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[loc]);

                    m_piData2[loc] = m_piData2[i];
                } else
                    m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[i]);
            } else {
                if (m_piData1[i] == CSeqNo::incseq(m_piData2[loc]))
                    m_piData2[loc] = m_piData1[i];
                else
                    m_iLength--;
            }

            m_piData1[i] = -1;
            m_piData2[i] = -1;
            m_piNext[loc] = m_piNext[i];
        } else
            break;
    }

    return m_iLength - origlen;
}

1. 这个函数首先处理了最简单的向空链表中插入元素的case。

这种情况下,m_iHead被赋予0值。m_piData1[m_iHead]会被赋值为要插入的这段丢失packet范围的起始SeqNo。

如果起始SeqNo和结束SeqNo的值不同,m_piData2[m_iHead]还会被赋值为结束SeqNo;如果相同,则m_piData2[m_iHead]将仍然保持构造函数中初始化的-1,以表示这段丢失packet范围只有一个元素。

m_piNext[m_iHead]被赋值为-1以表示这是链表中的最后一个元素。m_iLastInsertPos用来记录上一次插入的位置,这里会被赋值为m_iHead。m_iLength表示CSndLossList中记录的丢失packet的总格数,这里会被设置为这段packet的长度。然后返回m_iLength。

可见m_iHead指向链表的头部。m_piData1,m_piData2和m_piNext这三个数组中相同位置的元素共同表示一个链表节点,它们分别表示一个丢失packet范围的起始SeqNo,结束SeqNo和该节点在链表中next节点的位置。

2. 链表中已经有元素了,则将m_iLength保存在origlen中。计算要插入的这段丢失packet的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值offset。然后计算要插入的这段丢失packet范围的的可能的位置loc,这个可能的位置主要由这段丢失packet范围的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值决定。

由loc的计算方法可见,数组中的空间是被循环利用的。比如要插入的节点是向CSndLossList中插入的第二个节点,则此时m_iHead仍然为0,而要插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段,则新插入的节点将被绕回到数组的尾部。

3. 处理offset小于0的情况。这表示插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段值,此时则会在loc位置插入一个新的节点以描述这段丢失packet范围。更新m_iHead和m_iLastInsertPos指向新插入的这个节点。并更新m_iLength以体现新加入的这个丢失packet范围。

向单向链表的头部插入元素总是比较简单。由此我们也看到,这个链表是以节点的起始SeqNo字段值的升序排列的有序链表。

但这个地方貌似没有处理新插入的这个丢失packet范围与原有头节点表示的丢失packet范围存在交叉的情况?没错,是没有处理,这种情况会在处理完所有的插入情况之后再统一来做。

4. 处理offset大于0的情况。这又分为两种情况:

(1). seqno1 == m_piData1[loc],表明新节点的目标插入位置中原有节点保存的丢失Packet范围的起始SeqNo与要插入的这个丢失Packets范围的起始SeqNo相同。则此时会首先更新m_iLastInsertPos为loc,还需要处理这样的几种case,

case 1:原有的范围中只有一个元素,要插入的这个范围有多个元素。

case 2:原有的范围中有多个元素,要插入的这个范围有一个元素。

case 3:原有的范围和要插入的范围都只有一个元素。

case 4:原有的范围和要插入的范围中都有多个元素,但新插入的范围完全包含原有的范围。

case 5:原有的范围和要插入的范围中都有多个元素,但原有的范围完全包含新插入的范围。

case 6:原有的范围和要插入的范围中都有多个元素,且完全相同。

这些case包含的packet范围的相对关系可以用下图来简单表示:

代码的具体写法不同,这些case中的一些可能会以不同的方式被合并成一个处理,而有些case则不需要对原有的链表进行任何的调整。

if block中处理的是case1和case3,else-if block中处理的是case 4,else block中处理的是case 2,case 5和case 6。其中case 3,case 2,case 5和case 6都不需要对链表做出调整。

以此来看,在第一个if block的内部,应该再加一个else block来直接返回0会比较好一点。

那段代码的一种等价实现形式:

            if (seqno2 != seqno1) {
                if (-1 == m_piData2[loc]) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                    // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                    m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else {
                    return 0;
                }
            } else {
                // Do nothing if it is already there
                return 0;
            }

(2). seqno1与m_piData1[loc]不相等。这其实主要有两种可能,一是loc位置已经有了其它的节点,但该节点所表示的范围的起始SeqNo与要插入的这个范围的起始SeqNo不同;二是loc位置还没有被插入节点。但第一种可能应该是不会出现的,因而这里实际要处理的也就是loc位置还没有节点插入的情况。

对于这种情况,节点的插入位置完全不是问题,关键的问题是调整链表中一些节点的关系。可以看到这里的处理过程:

查找新插入节点前面的那个节点。该查找过程的开始位置由m_piData1[m_iLastInsertPos]与seqno1的相对大小决定,如果前者较小,则从m_iLastInsertPos开始,否则,从m_iHead开始。这大概主要是想要利用空间局部性原理来提高查找的效率。然后就是通过一个循环找到新插入节点前面的那个节点。

找到的这前面的节点所表示的丢失packet范围与要插入的节点所要表示的范围之间的关系又有这样的几种case:

case 1:前面的节点表示的范围只有一个packet。

case 2:前面的节点表示的范围含有多个packet,但它的结束SeqNo仍然小于要插入的范围的起始SeqNo。

case 3:前面的节点表示的范围含有多个packet,但它的结束SeqNo大于等于要插入的范围的起始SeqNo。

前两个case表明两个范围不相交,而case 3则表明两个范围是相交的。对于前两个case,则在前面的那个节点之后插入一个节点,链表中节点的连接关系做适当的调整即可。对于case 3,则需要将插入的这个范围合并入前面的那个节点。如果前面的那个节点包含的范围完全覆盖了要插入的范围,则什麽都不做,如果不是则需要对结束SeqNo字段做一些调整。

5. offset值等于0,表明要插入的这个范围的起始SeqNo与Head节点表示的范围的起始SeqNo相同,这个过程则与offset大于0时,seqno1 == m_piData1[loc]中的处理基本一致。

6. 从插入新节点的位置开始,合并链表中与新插入的这个丢失packet范围相交,或被包含或紧紧相邻的节点。

7. 返回新加入CSndLossList的丢失packet的总个数。

有这个函数的整个执行过程不难看出,头节点中将包含最老的丢失packets。

看完了插入,自然不能不再来看一下CSndLossList::remove():

void CSndLossList::remove(int32_t seqno) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return;

    // Remove all from the head pointer to a node with a larger seq. no. or the list is empty
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (0 == offset) {
        // It is the head. Remove the head and point to the next node
        loc = (loc + 1) % m_iSize;

        if (-1 == m_piData2[m_iHead])
            loc = m_piNext[m_iHead];
        else {
            m_piData1[loc] = CSeqNo::incseq(seqno);
            if (CSeqNo::seqcmp(m_piData2[m_iHead], CSeqNo::incseq(seqno)) > 0)
                m_piData2[loc] = m_piData2[m_iHead];

            m_piData2[m_iHead] = -1;

            m_piNext[loc] = m_piNext[m_iHead];
        }

        m_piData1[m_iHead] = -1;

        if (m_iLastInsertPos == m_iHead)
            m_iLastInsertPos = -1;

        m_iHead = loc;

        m_iLength--;
    } else if (offset > 0) {
        int h = m_iHead;

        if (seqno == m_piData1[loc]) {
            // target node is not empty, remove part/all of the seqno in the node.
            int temp = loc;
            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[temp])
                m_iHead = m_piNext[temp];
            else {
                // remove part, e.g., [3, 7] becomes [], [4, 7] after remove(3)
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[temp], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[temp];
                m_iHead = loc;
                m_piNext[loc] = m_piNext[temp];
                m_piNext[temp] = loc;
                m_piData2[temp] = -1;
            }
        } else {
            // target node is empty, check prior node
            int i = m_iHead;
            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno) < 0))
                i = m_piNext[i];

            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[i])
                m_iHead = m_piNext[i];
            else if (CSeqNo::seqcmp(m_piData2[i], seqno) > 0) {
                // remove part/all seqno in the prior node
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[i], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[i];

                m_piData2[i] = seqno;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iHead = loc;
            } else
                m_iHead = m_piNext[i];
        }

        // Remove all nodes prior to the new head
        while (h != m_iHead) {
            if (m_piData2[h] != -1) {
                m_iLength -= CSeqNo::seqlen(m_piData1[h], m_piData2[h]);
                m_piData2[h] = -1;
            } else
                m_iLength--;

            m_piData1[h] = -1;

            if (m_iLastInsertPos == h)
                m_iLastInsertPos = -1;

            h = m_piNext[h];
        }
    }
}

先来回忆下CSndLossList的类定义中,对于这个函数的语义的说明:移除所有不大于参数值的SeqNo。这个函数的主要执行过程如下:

1. 检查m_iLength是否为0,若为0,说明CSndLossList还没有加入任何SeqNo,则直接返回,否则继续执行。

2. 计算头节点所表示的丢失packet范围的起始SeqNo与参数seqno的offset,及可能以seqno作为起始SeqNo字段的节点的位置loc。

根据这个函数的语义,我们知道,其实是不需要处理offset小于的case的。如我们前面所了解的,头节点所表示的SeqNo范围是CSndLossList中SeqNo值最小的一个范围,而如果seqno小于这个范围的起始SeqNo的话,则说明不大于seqno的所有SeqNo都已经不存在了。

3. 处理offset == 0的情况。offset == 0,表明seqno是包含于头节点所表示的范围,而且还是这个范围的起始SeqNo。此时又主要分两种情况来处理:

(1). 头节点表示的这个范围只有一个SeqNo。

(2). 头节点表示的范围包含多个SeqNo。

对于情况(1),则头节点将向后滑动一个节点,原来头节点的存放位置会被复位。对于情况(2),为了保证两个节点的相对位置等于节点所表示的丢失packet范围的起始SeqNo的差值这样的一种节点间关系依然成立,需要将头节点保存的位置后移一个位置。对于情况(2),还会再分为两种情况来处理,一是原来的头节点中只包含2个SeqNo,则在移除seqno后只剩下了一个,此时要保持m_piData2[loc]为-1,若包含3个及以上SeqNo的,则要复制原来的头节点的结束SeqNo到新的位置。

其它就是适当地更新m_iLastInsertPos,m_iHead和m_iLength了。这里似乎补一个

        m_piNext[m_iHead] = -1;

要更好一点。

4. 处理offset大于0的情况。对于这种情况,比较麻烦的是找到seqno具体包含在哪个节点中。处理过程大致为:

(1). 检查一下,seqno与m_piData1[loc]是否相等,若相等,则要找的节点已经是找到了,且seqno为目标节点表示的丢失packet范围的起始seqno。此时则会再分为两种情况来处理,一是目标节点中只包含一个SeqNo,则使链表头指向目标节点的下一个节点。

二是目标节点中包含多个SeqNo,则需要将seqno排除在目标节点范围之外新建一个节点,将新节点保存在目标节点后面相邻的位置,若目标节点中包含2个节点,则需要设置新节点的结束SeqNo字段为-1,若大于等于3,则复制此字段的值,然后将原来的目标节点改造为只包含seqno的节点,并使它指向新建的这个节点。总之就是将原来的一个节点拆分成了两个节点,一个节点只包含seqno,另一个则包含原来的目标节点中其余的SeqNo。

(2). seqno与m_piData1[loc]不相等的情况,则需要先找到起始SeqNo字段小于seqno的SeqNo值最大的那个节点。这又可以分为3种case来处理,

case 1:找到的节点只包含一个SeqNo,此时则使链表头指向找到的节点的next节点。

case 2:找到的节点包含多个SeqNo,且seqno小于找到的节点的结束SeqNo字段,此时则需要将找到的节点裂为链表中的两个节点,一个包含的范围为[原节点的起始SeqNo,seqno],另一个包含的范围为[seqno + 1, 原节点的结束SeqNo]。同时链表头应该指向后者。

case 3:找到的节点包含多个SeqNo,且seqno大于等于找到的节点的结束SeqNo字段,此时则同样使链表头指向找到的节点的next节点。

5. 移除新找到的头节点之前所有的节点。适当地更新m_iLength等。

这个地方移除的操作看起来好罗嗦。必须要计算出offset值,而不是简单的用比较符号,是由SeqNo的递增规则决定的。但对于offset大于等于0的情况,则可以使用统一的过程来处理:找到起始SeqNo字段不大于seqno的 SeqNo值最大的那个节点,然后根据seqno与这个节点描述的范围的4种关系,即seqno等于找到的节点的起始SeqNo值,seqno大于起始SeqNo值但小于结束SeqNo值,seqno等于结束SeqNo值,seqno大于结束SeqNo值,及找到的节点所描述的范围的大小,来分类处理即可。

还有我们前面在CUDT::packData()中看到的CSndLossList::getLostSeq():

int32_t CSndLossList::getLostSeq() {
    if (0 == m_iLength)
        return -1;

    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return -1;

    if (m_iLastInsertPos == m_iHead)
        m_iLastInsertPos = -1;

    // return the first loss seq. no.
    int32_t seqno = m_piData1[m_iHead];

    // head moves to the next node
    if (-1 == m_piData2[m_iHead]) {
        //[3, -1] becomes [], and head moves to next node in the list
        m_piData1[m_iHead] = -1;
        m_iHead = m_piNext[m_iHead];
    } else {
        // shift to next node, e.g., [3, 7] becomes [], [4, 7]
        int loc = (m_iHead + 1) % m_iSize;

        m_piData1[loc] = CSeqNo::incseq(seqno);
        if (CSeqNo::seqcmp(m_piData2[m_iHead], m_piData1[loc]) > 0)
            m_piData2[loc] = m_piData2[m_iHead];

        m_piData1[m_iHead] = -1;
        m_piData2[m_iHead] = -1;

        m_piNext[loc] = m_piNext[m_iHead];
        m_iHead = loc;
    }

    m_iLength--;

    return seqno;
}

这个函数读取首个丢失packet的SeqNo。

这个函数会取链表头节点中最小的SeqNo,返回给调用者,然后将这个SeqNo从链表头节点中移除。移除的时候又可以分为2种情况:1. 头节点描述的packet范围只包含一个SeqNo,2. 包含多个SeqNo。

对于情况1,则将链表头节点向后移动一个节点,然后移除原来的头节点。对于情况2,则将原来的头节点分裂为两个节点,一个只包含首个丢失packet的SeqNo,另一个包含原来头节点中其余的SeqNo,令链表头节点指向后一个节点,并移除前一个节点。

可见,通过CSndLossList::getLostSeq()返回给调用者的Packet是会被从CSndLossList中移除出去的。

最后可以再来看一下,经过了这些操作的蹂躏之后,这个链表可能的样子。比如有4段丢失的packet,其SeqNo范围及被插入的顺序为[8, 9],[3, 5],[12,12],[15, 19],假设缓冲区大小为20,则看起来可能为:

UDT中,丢失packet列表大体如此。

ACK

接着来看一下UDT中的ACK机制。

超时重传

再来看一下UDT中的超时重传机制

NACK

然后是NACK。

发送窗口及发送速率的调整

这里来看一下发送窗口及发送速率的控制方法。

接收端的反馈

接收端的反馈

拥塞控制

拥塞控制

Done。

时间: 2024-10-18 05:36:34

UDT协议实现分析——数据发送控制的相关文章

UDT协议实现分析——数据的发送

连接建立起来之后,我们就可以通过UDT Socket进行数据的收发了.先来看用来发送数据的几个函数.UDT提供了如下的几个函数用于不同目的下的数据发送: UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); UDT_API i

UDT协议实现分析——数据的接收

看了UDT中数据发送的部分之后,我们转换一个角度,来看一下接收端发生的故事. 如我们前面在 UDT协议实现分析--连接的建立 一文中看到的那样,CUDT在connect()的后半场,会通过调用CRcvQueue::removeConnector()把它自己从它的CChannel的接收队列CRcvQueue的m_pRendezvousQueue队列中移除出去,以表示连接已成功建立,后面不再通过m_pRendezvousQueue接收连接相关消息,并通过调用CRcvQueue::setNewEntr

UDT协议实现分析——发送窗口大小及发送速率的调整

UDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用,并使网络中数据传输的速率尽可能快. 如我们前面在分析数据发送的控制中看到的,对于正常的顺序packet发送,发送控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由拥塞窗口大小m_dCongestionWindow和滑动窗口大小m_iFlowWindowSize来描述,发送窗口大小为两者中较小的那一个:二是控制两个数据包发送的时间间隔,也就是包的发送速率,

UDT协议实现分析——连接的建立

UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了.这里我们研究一下UDT连接建立的过程. 连接的发起 来看连接的发起方.如前面我们看到的那样,UDT Client创建一个Socket,可以将该Socket绑定到某个端口,也可以不绑定,然后就可以调用UDT::connect()将这个Socket连接到UDT Server了.来看UDT::connect()的定义(src/api.cpp): int CUDTUnited::connect(const UDTS

UDT协议实现分析——UDT Socket的创建

UDT API的用法 在分析 连接的建立过程 之前,先来看一下UDT API的用法.在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接:有一个或多个客户端连接UDT Server:UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信. 先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下. #include <unistd.h>

UDT协议实现分析——bind、listen与accept

UDT Server启动之后,基于UDT协议的UDP数据可靠传输才成为可能,因而接下来分析与UDT Server有关的几个主要API的实现,来了解下UDT Server是如何listening在特定UDP端口上的.主要有UDT::bind(),UDT::listen()和UDT::accept()等几个函数. bind过程 通常UDT Server在创建UDT Socket之后,首先就要调用UDT::bind(),与一个特定的本地UDP端口地址进行绑定,以便可以在希望的端口上监听.这里来看一下U

UDT协议实现分析——UDT数据收发的可靠性保障

不管是数据的发送还是数据的接收,大体的流程我们基本上是都理了一下,还分析了数据收发过程中用的数据结构,接下来就看一些UDT中数据收发更精细的一些控制. UDT数据收发的可靠性保障 来看一下UDT中数据收发的可靠性保障. 接收包丢失列表CRcvLossList 先来看一下CRcvLossList的定义: class CRcvLossList { public: CRcvLossList(int size = 1024); ~CRcvLossList(); // Functionality: //

UDT协议实现分析——UDT初始化和销毁

UDT协议是一个用于在告诉Internet上传输大量数据的基于UDP的可靠传输协议. 我们可以将UDT协议的实现看作一个比较复杂的状态机.更准确的说,是一个主状态机,外加多个子状态机.主状态机是指协议实现中全局唯一.全局共享的状态与数据结构,主要对应于CUDTUnited类.子状态机则是对于一次UDT连接或一个Listening的UDT Server的抽象,是UDT自己创建的Socket抽象,一个与系统socket相似但又不同的概念,主要对应于CUDTSocket和CUDT类.UDT的Socke

UDT协议实现分析总结

UDT的整体结构 UDT Socket是UDT中的核心,同时它也是一座桥梁,它将UDT的使用者应用程序与内部实现部分对于数据结构的管理.网络数据的传输连接起来. 应用程序通过它将数据放进发送缓冲待发送,或者借由它来获取从网络接收数据.而与网络进行交互的部分,则从它那里拿到要发送的数据进行发送,或者在收到packet时将packet dispatch给它. UDT的数据接收部分框架: UDT的数据发送部分框架: UDT的一些问题 1. 接口的合理性. 分析了UDT的这许多code,给人的感觉就是,