UDT协议实现分析——数据的接收

看了UDT中数据发送的部分之后,我们转换一个角度,来看一下接收端发生的故事。

如我们前面在 UDT协议实现分析——连接的建立 一文中看到的那样,CUDT在connect()的后半场,会通过调用CRcvQueue::removeConnector()把它自己从它的CChannel的接收队列CRcvQueue的m_pRendezvousQueue队列中移除出去,以表示连接已成功建立,后面不再通过m_pRendezvousQueue接收连接相关消息,并通过调用CRcvQueue::setNewEntry()及CRcvQueue::worker()的机制,将自己加进自己的CChannel的接收队列CRcvQueue的m_pRcvUList和m_pHash中,以便于能够通过m_pRcvUList和m_pHash接收后续发送给自己的控制消息及数据消息。

CUnitQueue

如在前面CRcvQueue::worker()函数的定义中所看到的那样,一次接收过程大体为,从CUnitQueue m_UnitQueue中获取一个可用的CUnit unit,将接收到的数据/控制消息保存进unit,然后将根据消息的具体情况,将消息dispatch不同的接收者。

这里就先来看一下CUnitQueue。先是这个类的定义(src/queue.h):

struct CUnit {
    CPacket m_Packet;		// packet
    int m_iFlag;			// 0: free, 1: occupied, 2: msg read but not freed (out-of-order), 3: msg dropped
};

class CUnitQueue {
    friend class CRcvQueue;
    friend class CRcvBuffer;

 public:
    CUnitQueue();
    ~CUnitQueue();

 public:

    // Functionality:
    //    Initialize the unit queue.
    // Parameters:
    //    1) [in] size: queue size
    //    2) [in] mss: maximum segament size
    //    3) [in] version: IP version
    // Returned value:
    //    0: success, -1: failure.

    int init(int size, int mss, int version);

    // Functionality:
    //    Increase (double) the unit queue size.
    // Parameters:
    //    None.
    // Returned value:
    //    0: success, -1: failure.

    int increase();

    // Functionality:
    //    Decrease (halve) the unit queue size.
    // Parameters:
    //    None.
    // Returned value:
    //    0: success, -1: failure.

    int shrink();

    // Functionality:
    //    find an available unit for incoming packet.
    // Parameters:
    //    None.
    // Returned value:
    //    Pointer to the available unit, NULL if not found.

    CUnit* getNextAvailUnit();

 private:
    struct CQEntry {
        CUnit* m_pUnit;		// unit queue
        char* m_pBuffer;		// data buffer
        int m_iSize;		// size of each queue

        CQEntry* m_pNext;
    }*m_pQEntry,			// pointer to the first unit queue
            *m_pCurrQueue,		// pointer to the current available queue
            *m_pLastQueue;		// pointer to the last unit queue

    CUnit* m_pAvailUnit;         // recent available unit

    int m_iSize;			// total size of the unit queue, in number of packets
    int m_iCount;		// total number of valid packets in the queue

    int m_iMSS;			// unit buffer size
    int m_iIPversion;		// IP version

 private:
    CUnitQueue(const CUnitQueue&);
    CUnitQueue& operator=(const CUnitQueue&);
};

同样是一个不可复制的容器,CUnit的容器。但究竟是什么类型的容器,则还需要看一下成员函数的具体定义。先来看一下构造函数,析构函数及初始化init()函数(src/queue.cpp):

CUnitQueue::CUnitQueue()
        : m_pQEntry(NULL),
          m_pCurrQueue(NULL),
          m_pLastQueue(NULL),
          m_iSize(0),
          m_iCount(0),
          m_iMSS(),
          m_iIPversion() {
}

CUnitQueue::~CUnitQueue() {
    CQEntry* p = m_pQEntry;

    while (p != NULL) {
        delete[] p->m_pUnit;
        delete[] p->m_pBuffer;

        CQEntry* q = p;
        if (p == m_pLastQueue)
            p = NULL;
        else
            p = p->m_pNext;
        delete q;
    }
}

int CUnitQueue::init(int size, int mss, int version) {
    CQEntry* tempq = NULL;
    CUnit* tempu = NULL;
    char* tempb = NULL;

    try {
        tempq = new CQEntry;
        tempu = new CUnit[size];
        tempb = new char[size * mss];
    } catch (...) {
        delete tempq;
        delete[] tempu;
        delete[] tempb;

        return -1;
    }

    for (int i = 0; i < size; ++i) {
        tempu[i].m_iFlag = 0;
        tempu[i].m_Packet.m_pcData = tempb + i * mss;
    }
    tempq->m_pUnit = tempu;
    tempq->m_pBuffer = tempb;
    tempq->m_iSize = size;

    m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
    m_pQEntry->m_pNext = m_pQEntry;

    m_pAvailUnit = m_pCurrQueue->m_pUnit;

    m_iSize = size;
    m_iMSS = mss;
    m_iIPversion = version;

    return 0;
}

构造函数没有什么特别需要注意的地方。来看CUnitQueue::init()的定义,它执行了这样一些步骤:

1. 先是分配了一个CQEntry对象tempq,一个长度为size的CUnit数组tempu,和长度为(size * mss)的一段内存缓冲区tempb。

2. 初始化CUnit数组tempu中的每个元素,设置每个元素的m_iFlag为0,每个元素的CPacket类型字段m_Packet的数据区指针m_pcData指向内存缓冲区tempb的适当位置,即第0个元素的CPacket类型字段m_Packet的数据区指针m_pcData指向tempb的起始位置,后面的元素的CPacket类型字段m_Packet的数据区指针m_pcData依次指向它前面的元素所指位置之后的mss字节处。

3. 设置tempq的m_pUnit字段指向tempu,tempq的m_pBuffer字段指向tempb,tempq的m_iSize字段为size。将tempq的值赋给m_pQEntry,m_pCurrQueue和m_pLastQueue。令m_pQEntry->m_pNext为m_pQEntry,令m_pAvailUnit为m_pCurrQueue->m_pUnit,令m_iSize,m_iMSS和m_iIPversion分别为传入的参数size,mss,和version。

UDT中执行CUnitQueue::init()时实际传入的参数为传递给CRcvQueue::init()的参数qsize, payload, version,这些参数的实际值分别为32,s->m_pUDT->m_iPayloadSize和m.m_iIPversion。

然后返回0。

基本上可以认为,在CUnitQueue::init()中基本上就是构造了一个有效的CQEntry。

由此不难看到CUnitQueue中主要是一个CQEntry的环形链表。CQEntry用于管理一段用于存放接收到的数据的内存缓冲区m_pBuffer及CUnit结构m_pUnit,前者用于实际存放接收到的消息的数据部分,后者则用于将接收到的内容组织为CPacket。

(CUnitQueue + CQEntry + CUnit)这样的结构与(CSndBuffer + Buffer + Block)结构还是蛮相似的。

接着来看我们的老朋友CUnitQueue::getNextAvailUnit(),CRcvQueue::worker()正是通过调用它获得一个CUnit,用于存放接收到的消息:

CUnit* CUnitQueue::getNextAvailUnit() {
    if (m_iCount * 10 > m_iSize * 9)
        increase();

    if (m_iCount >= m_iSize)
        return NULL;

    CQEntry* entrance = m_pCurrQueue;

    do {
        for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel;
                ++m_pAvailUnit)
            if (m_pAvailUnit->m_iFlag == 0)
                return m_pAvailUnit;

        if (m_pCurrQueue->m_pUnit->m_iFlag == 0) {
            m_pAvailUnit = m_pCurrQueue->m_pUnit;
            return m_pAvailUnit;
        }

        m_pCurrQueue = m_pCurrQueue->m_pNext;
        m_pAvailUnit = m_pCurrQueue->m_pUnit;
    } while (m_pCurrQueue != entrance);

    increase();

    return NULL;
}

这函数的执行过程为:

1. m_iCount为CUnitQueue中已经被用到的CUnit个数,m_iSize为CUnitQueue中包含的CUnit总数。这里会在CUnitQueue中CUnit的使用率超过90%时,进行容量的扩充操作increase()。

2. 再次检查CUnitQueue中已经用到的CUnit个数是否超出CUnit的总个数,若超出则直接返回。既然有了前面第1步的保证,这里的检查是否还有必要呢?后面就会明白。

3. 通过一个两层循环,来查找一个可用的CUnit。

CUnit的m_iFlag用来标记它是否已经被占用,若m_iFlag值为0,表示还没有被占用,若为1则表示已经被占用。

外层循环用于遍历CQEntry的循环链表,自m_pCurrQueue始,至m_pCurrQueue终。

内层循环则遍历一个CQEntry中的所有CUnit,若找到可用的CUnit,会直接返回给调用者。这里会在遍历结束找不到可用CUnit的情况下,再次检查m_pCurrQueue->m_pUnit,也就是一个CQEntry的首个CUnit的情况,让人看上去也是有点奇怪。

4. 上一步中没有找到可用的CUnit,这里会再次执行increase()以扩充容量,并向调用这返回NULL。

接着再来看下用于扩充容量的increase()函数:

int CUnitQueue::increase() {
    // adjust/correct m_iCount
    int real_count = 0;
    CQEntry* p = m_pQEntry;
    while (p != NULL) {
        CUnit* u = p->m_pUnit;
        for (CUnit* end = u + p->m_iSize; u != end; ++u)
            if (u->m_iFlag != 0)
                ++real_count;

        if (p == m_pLastQueue)
            p = NULL;
        else
            p = p->m_pNext;
    }
    m_iCount = real_count;
    if (double(m_iCount) / m_iSize < 0.9)
        return -1;

    CQEntry* tempq = NULL;
    CUnit* tempu = NULL;
    char* tempb = NULL;

    // all queues have the same size
    int size = m_pQEntry->m_iSize;

    try {
        tempq = new CQEntry;
        tempu = new CUnit[size];
        tempb = new char[size * m_iMSS];
    } catch (...) {
        delete tempq;
        delete[] tempu;
        delete[] tempb;

        return -1;
    }

    for (int i = 0; i < size; ++i) {
        tempu[i].m_iFlag = 0;
        tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
    }
    tempq->m_pUnit = tempu;
    tempq->m_pBuffer = tempb;
    tempq->m_iSize = size;

    m_pLastQueue->m_pNext = tempq;
    m_pLastQueue = tempq;
    m_pLastQueue->m_pNext = m_pQEntry;

    m_iSize += size;

    return 0;
}

int CUnitQueue::shrink() {
    // currently queue cannot be shrunk.
    return -1;
}

这个函数的执行过程如下:

1. 计算真正的已被占用的CUnit的个数real_count。这个计算过程所涵盖的CQEntry自m_pQEntry始,至m_pLastQueue(包含m_pLastQueue)。

2. 将计算所得的real_count赋值给m_iCount,并基于计算所得值,再次检查CUnit的使用率,若使用率依然低于90%,则直接返回。否则则继续执行真正的容量扩充的动作。

3. 在原有的CQEntry环形链表的基础上,新创建一个CQEntry,这包括创建一个CQEntry对象,创建CUnit的数组,创建数据缓冲区,初始化CUnit数组及正确的设置CQEntry的各字段。

这个过程与CUnitQueue::init()中创建首个有效的CQEntry对象的过程基本一致。又是大段的重复code唉。

4. 将新创建的CQEntry对象插入CQEntry对象的环形链表中,在m_pLastQueue之后,m_pQEntry之前。由此可见,m_pQEntry用于记录CQEntry对象的环形链表的头节点,而m_pLastQueue用于记录CQEntry对象的环形链表的尾节点。

5. 更新m_iSize的值并返回0。

这就是CUnitQueue类本身提供的所有东西了。奇怪的是,我们只能看到从CUnitQueue中获取CUnit的代码,而看不到想CUnitQueue中还回CUnit的代码。主要的秘密就在于CUnitQueue有将CRcvQueue和CRcvBuffer声明为它的friend class,这就允许这个类可以自由地修改CUnitQueue的成员。这种严重破坏封装的设计,还真是让人头大。

数据接收

接着回到我们的数据接收过程。建立连接之后,双方传递的消息中都会包含有效Target SocketID字段,其CUDT会存在于CRcvQueue的m_pHash中,因而CRcvQueue::worker()主要是通过下面的这段code来将消息dispatch给接收数据的UDT Socket:

        } else if (id > 0) {
            if (NULL != (u = self->m_pHash->lookup(id))) {
                if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {
                    if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {
                        if (0 == unit->m_Packet.getFlag())
                            u->processData(unit);
                        else
                            u->processCtrl(unit->m_Packet);

                        u->checkTimers();
                        self->m_pRcvUList->update(u);
                    }
                }
            } else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {

可以看到,在这里会先找到目标CUDT,然后在CUDT依然处于有效的连接状态时,根据接收到的Packet的类型,将Packet dispatch给CUDT::processData()或

CUDT::processCtrl(),这里研究数据packet的接收,因而主要来看CUDT::processData()的定义(src/core.cpp):

int CUDT::processData(CUnit* unit) {
    CPacket& packet = unit->m_Packet;

    // Just heard from the peer, reset the expiration count.
    m_iEXPCount = 1;
    uint64_t currtime;
    CTimer::rdtsc(currtime);
    m_ullLastRspTime = currtime;

    m_pCC->onPktReceived(&packet);
    ++m_iPktCount;
    // update time information
    m_pRcvTimeWindow->onPktArrival();

    // check if it is probing packet pair
    if (0 == (packet.m_iSeqNo & 0xF))
        m_pRcvTimeWindow->probe1Arrival();
    else if (1 == (packet.m_iSeqNo & 0xF))
        m_pRcvTimeWindow->probe2Arrival();

    ++m_llTraceRecv;
    ++m_llRecvTotal;

    int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);
    if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))
        return -1;

    if (m_pRcvBuffer->addData(unit, offset) < 0)
        return -1;

    // Loss detection.
    if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) {
        // If loss found, insert them to the receiver loss list
        m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));

        // pack loss list for NAK
        int32_t lossdata[2];
        lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;
        lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);

        // Generate loss report immediately.
        sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);

        int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;
        m_iTraceRcvLoss += loss;
        m_iRcvLossTotal += loss;
    }

    // This is not a regular fixed size packet...
    //an irregular sized packet usually indicates the end of a message, so send an ACK immediately
    if (packet.getLength() != m_iPayloadSize)
        CTimer::rdtsc(m_ullNextACKTime);

    // Update the current largest sequence number that has been received.
    // Or it is a retransmitted packet, remove it from receiver loss list.
    if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)
        m_iRcvCurrSeqNo = packet.m_iSeqNo;
    else
        m_pRcvLossList->remove(packet.m_iSeqNo);

    return 0;
}

这个函数主要执行了这样一个过程:

1. 将m_iEXPCount置为1。m_iEXPCount具体的含义会在后面研究UDT的定时器时再来详细地分析。

2. 获取当前的时间,并赋值给m_ullLastRspTime。m_ullLastRspTime具体的含义会在后面研究UDT的定时器时再来详细地分析。

3. 执行拥塞控制器m_pCC的回调,onPktReceived(&packet)。后面研究拥塞控制时会再来分析这个回调。

4. 递增m_iPktCount。

5. 更新时间信息,执行接收时间窗口的m_pRcvTimeWindow->onPktArrival()。

6. 检查是否是probing packet对,并根据具体情况分别执行m_pRcvTimeWindow->probe1Arrival()或m_pRcvTimeWindow->probe2Arrival()。

7. 递增m_llTraceRecv和m_llRecvTotal。用于进行统计的字段,前者表示一次trace,接收到的所有数据packet的个数,后者则表示自UDT Socket连接成功起,接收到的总的packet个数。两者的区别在于,前者会在执行CUDT::sample()获取trace信息之后被复位,后者则不会。

8. 计算接收到的packet的SeqNo与m_iRcvLastAck的偏移offset。m_iRcvLastAck表示上一次发送的ACK确认的SeqNo,SeqNo值小于m_iRcvLastAck的所有packet都已经被接收或者不再需要被接收,同时m_iRcvLastAck也是处于接收窗口中的SeqNo最小的packet。在连接建立的过程中,双方会进行协商来确定m_iRcvLastAck,会被置为双方约定的首个数据packet的SeqNo。

9. 检查offset值。若offset小于0,表明接收窗口已经滑过了接收到的这个packet,若大于接收缓冲区m_pRcvBuffer的可用大小,表明接收窗口还没有滑至包含接收到的packet的范围。对于这两种情况,则直接返回-1退出,否则继续执行。

10. 将unit添加进接收缓冲区。若失败则返回-1退出,否则继续执行。

11. 比较接收到的packet的SeqNo与CSeqNo::incseq(m_iRcvCurrSeqNo)。m_iRcvCurrSeqNo表示接收到的最大的Sequence number,在连接建立过程中,双方会协商确定具体值,该值为(双方约定的首个数据packet的SeqNo - 1)。若packet.m_iSeqNo大于CSeqNo::incseq(m_iRcvCurrSeqNo),表明packet没有连续到达,或可能有packet丢失。此时则会将[CSeqNo::incseq(m_iRcvCurrSeqNo),CSeqNo::decseq(packet.m_iSeqNo)]区间添加进接收丢失包列表m_pRcvLossList。并向发送端发送一个NACK,即Loss Report。

然后计算丢失的packet的个数,并更新m_iTraceRcvLoss和m_iRcvLossTotal,m_iTraceRcvLoss和m_iRcvLossTotal也是用来做统计的,用来统计packet丢失的数量。

12. 如果接收到的packet的数据长度不为m_iPayloadSize,通常意味着这是一个Msg的结束packet,则应该向发送端发送一个ACK消息,于是将当前时间读进m_ullNextACKTime。

可见在UDT中,对ACK消息的发送都不是在接收到数据包之后立即直接来进行的,而主要是在定时器Timer中进行的。

13. 比较packet.m_iSeqNo与m_iRcvCurrSeqNo,若前者较大,则会将后者更新为前者的值,否则,可能是接收到了一个原本丢失的packet,需要将该packet从丢失接收packet列表m_pRcvLossList中移除出去。

14. 返回0。

总结一下,在CRcvQueue::worker()这一层接收到数据包的dispatch过程,主要是将数据包保存在CUDT的接收缓冲区中。如果发现有疑似丢包现象,则将丢失的packet区间保存在接收包丢失列表中,并立即发送NACK消息给发送端,如果收到的packet为Msg的最后一个packet,则调度发送ACK消息。

接收缓冲区CRcvBuffer

这里再来看一下接收缓冲区CRcvBuffer。这个class的定义为(src/buffer.h):

class CRcvBuffer {
 public:
    CRcvBuffer(CUnitQueue* queue, int bufsize = 65536);
    ~CRcvBuffer();

    // Functionality:
    //    Write data into the buffer.
    // Parameters:
    //    0) [in] unit: pointer to a data unit containing new packet
    //    1) [in] offset: offset from last ACK point.
    // Returned value:
    //    0 is success, -1 if data is repeated.
    int addData(CUnit* unit, int offset);

    // Functionality:
    //    Read data into a user buffer.
    // Parameters:
    //    0) [in] data: pointer to user buffer.
    //    1) [in] len: length of user buffer.
    // Returned value:
    //    size of data read.
    int readBuffer(char* data, int len);

    // Functionality:
    //    Read data directly into file.
    // Parameters:
    //    0) [in] file: C++ file stream.
    //    1) [in] len: expected length of data to write into the file.
    // Returned value:
    //    size of data read.
    int readBufferToFile(std::fstream& ofs, int len);

    // Functionality:
    //    Update the ACK point of the buffer.
    // Parameters:
    //    0) [in] len: size of data to be acknowledged.
    // Returned value:
    //    1 if a user buffer is fulfilled, otherwise 0.
    void ackData(int len);

    // Functionality:
    //    Query how many buffer space left for data receiving.
    // Parameters:
    //    None.
    // Returned value:
    //    size of available buffer space (including user buffer) for data receiving.
    int getAvailBufSize() const;

    // Functionality:
    //    Query how many data has been continuously received (for reading).
    // Parameters:
    //    None.
    // Returned value:
    //    size of valid (continous) data for reading.
    int getRcvDataSize() const;

    // Functionality:
    //    mark the message to be dropped from the message list.
    // Parameters:
    //    0) [in] msgno: message nuumer.
    // Returned value:
    //    None.
    void dropMsg(int32_t msgno);

    // Functionality:
    //    read a message.
    // Parameters:
    //    0) [out] data: buffer to write the message into.
    //    1) [in] len: size of the buffer.
    // Returned value:
    //    actuall size of data read.
    int readMsg(char* data, int len);

    // Functionality:
    //    Query how many messages are available now.
    // Parameters:
    //    None.
    // Returned value:
    //    number of messages available for recvmsg.
    int getRcvMsgNum();

 private:
    bool scanMsg(int& start, int& end, bool& passack);

 private:
    CUnit** m_pUnit;                     // pointer to the protocol buffer
    int m_iSize;                         // size of the protocol buffer
    CUnitQueue* m_pUnitQueue;		// the shared unit queue

    int m_iStartPos;                     // the head position for I/O (inclusive)
    int m_iLastAckPos;                   // the last ACKed position (exclusive)
    // EMPTY: m_iStartPos = m_iLastAckPos   FULL: m_iStartPos = m_iLastAckPos + 1
    int m_iMaxPos;			// the furthest data position

    int m_iNotch;			// the starting read point of the first unit

 private:
    CRcvBuffer();
    CRcvBuffer(const CRcvBuffer&);
    CRcvBuffer& operator=(const CRcvBuffer&);
};

仅仅由类的定义,实在是难以看出太多的信息。

构造与析构

接着来看一下成员函数的实现,首先是构造函数和析构函数(src/buffer.cpp):

CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize)
        : m_pUnit(NULL),
          m_iSize(bufsize),
          m_pUnitQueue(queue),
          m_iStartPos(0),
          m_iLastAckPos(0),
          m_iMaxPos(0),
          m_iNotch(0) {
    m_pUnit = new CUnit*[m_iSize];
    for (int i = 0; i < m_iSize; ++i)
        m_pUnit[i] = NULL;
}

CRcvBuffer::~CRcvBuffer() {
    for (int i = 0; i < m_iSize; ++i) {
        if (NULL != m_pUnit[i]) {
            m_pUnit[i]->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;
        }
    }

    delete[] m_pUnit;
}

在构造函数中主要是常见了一个CUnit指针的数组m_pUnit,长度为m_iSize,然后初始化它。而在析构函数中则主要是更新抓到的CUnit的状态为未被使用状态,并适当地更新m_pUnitQueue的已用CUnit个数字段m_iCount,然后delete掉在构造函数中分配的CUnit指针数组。

addData

然后来看CRcvBuffer::addData():

int CRcvBuffer::addData(CUnit* unit, int offset) {
    int pos = (m_iLastAckPos + offset) % m_iSize;
    if (offset > m_iMaxPos)
        m_iMaxPos = offset;

    if (NULL != m_pUnit[pos])
        return -1;

    m_pUnit[pos] = unit;

    unit->m_iFlag = 1;
    ++m_pUnitQueue->m_iCount;

    return 0;
}

这个函数的执行过程为:

1. 计算unit在接受缓冲区的CUnit指针数组m_pUnit中的存放位置pos。

2. 若offset大于m_iMaxPos,则会将m_iMaxPos更新为offset。

3. 将unit放进CUnit指针数组m_pUnit的pos位置处。

4. 将unit的m_iFlag置为1,表示该unit被占用,在CUnitQueue::getNextAvailUnit()中不能再被返回用于存放接收来的数据了。

5. 递增m_pUnitQueue的m_iCount,m_pUnitQueue的m_iCount表示它的已被占用CUnit的个数。

6. 返回0值。

CRcvBuffer中抓到CUnitQueue的指针,然后对后者的私有成员变量搞东搞西,也真是让人醉了。

readBuffer

接收缓冲区CRcvBuffer就如同我们前面分析的发送缓冲区CSndBuffer一样,一边有线程向里面塞数据,另一边有线程从中读取数据,向其中塞数据的正是我们刚刚分析的CRcvQueue::worker()->...->CRcvBuffer::addData()。接着我们就看看从中读取数据那一边的函数,先来看RcvBuffer::readBuffer():

int CRcvBuffer::readBuffer(char* data, int len) {
    int p = m_iStartPos;
    int lastack = m_iLastAckPos;
    int rs = len;

    while ((p != lastack) && (rs > 0)) {
        int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
        if (unitsize > rs)
            unitsize = rs;

        memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
        data += unitsize;

        if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;

            if (++p == m_iSize)
                p = 0;

            m_iNotch = 0;
        } else
            m_iNotch += rs;

        rs -= unitsize;
    }

    m_iStartPos = p;
    return len - rs;
}

这个函数主要通过一个循环来尽可能多的读取已经接收到的完整连续的数据packet。

m_iStartPos为下一次读取时,读取的起始packet在m_pUnit数组中的index,即index在m_iStartPos之前的所有packet/CUnit都已经被读取了。

m_iLastAckPos表示最后被ACK的packet的index,即index在m_iLastAckPos之前的所有packet都已经收到了,m_iLastAckPos为读取的结束位置。

在传入的数据缓冲区还没有塞满,同时还没有读到结束位置之前一直会尝试着去读取。

在循环体中会试图去读取一个数据packet中包含的数据。这个读取过程为:

1. 计算要读取的packet中剩余未读取的数据的数量unitsize。一个已经完全接收到的数据packet中的数据,可能由于调用者请求的数据量比较小,而存在剩余的未读取的数据在。m_iNotch用于记录 要读取的packet中,上一次读取时读到的数据的大小。

2. 比较unitsize和rs,若unitsize大于rs,则将unitsize设置为rs。rs表示剩余要读取的数据的大小,unitsize表示本轮读取操作应该读取的数据的大小。通过这个地方的检查,可以防止向data缓冲区中写入数据时超出边界。

3. 执行一次memcpy,将packet中的数据读取到data缓冲区中,并更新data缓冲区的位置。

4. 如果rs大于unitsize或rs等于(m_pUnit[p]->m_Packet.getLength() - m_iNotch),表明当前的这个packet中的数据已经完全被读取了,此时需要将这个packet归还给CUnitQueue m_pUnitQueue,跳到下一个CUnit/packet,即递增p,并将m_iNotch置为0。这个地方的if语句等价于下面的这个if语句:

        if (rs >= m_pUnit[p]->m_Packet.getLength() - m_iNotch) {

或下面的这个:

if (unitsize == m_pUnit[p]->m_Packet.getLength() - m_iNotch) {

这个语句看起来还要更加清晰简洁一点呢。

5. 对于当前的这个packet中的数据没有被读完的情况则是简单的将m_iNotch加rs,此时rs应该等于unitsize。就各个变量的语义而言,这里似乎给m_iNotch加unitsize要更好读一点。

6. 在循环体的结尾处会更新rs,主要是从中去除本次已经读取的大小。

7. 在循环体外会更新m_iStartPos为p。

8. 向调用者返回实际读取的数据的大小。

readBufferToFile

接着我们再来看一下CRcvBuffer::readBufferToFile():

int CRcvBuffer::readBufferToFile(fstream& ofs, int len) {
    int p = m_iStartPos;
    int lastack = m_iLastAckPos;
    int rs = len;

    while ((p != lastack) && (rs > 0)) {
        int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
        if (unitsize > rs)
            unitsize = rs;

        ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
        if (ofs.fail())
            break;

        if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;

            if (++p == m_iSize)
                p = 0;

            m_iNotch = 0;
        } else
            m_iNotch += rs;

        rs -= unitsize;
    }

    m_iStartPos = p;

    return len - rs;
}

这个函数的执行与CRcvBuffer::readBuffer()的执行极其相似。仅有的差别在于,后者是将一个packet的数据memcpy进调用者传入的数据缓冲区,而这个函数则是将数据write进传入的fstream,并会在失败时跳出循环。

又是大段大段的重复code。

readMsg

然后再来看一下CRcvBuffer::readMsg():

int CRcvBuffer::readMsg(char* data, int len) {
    int p, q;
    bool passack;
    if (!scanMsg(p, q, passack))
        return 0;

    int rs = len;
    while (p != (q + 1) % m_iSize) {
        int unitsize = m_pUnit[p]->m_Packet.getLength();
        if ((rs >= 0) && (unitsize > rs))
            unitsize = rs;

        if (unitsize > 0) {
            memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
            data += unitsize;
            rs -= unitsize;
        }

        if (!passack) {
            CUnit* tmp = m_pUnit[p];
            m_pUnit[p] = NULL;
            tmp->m_iFlag = 0;
            --m_pUnitQueue->m_iCount;
        } else
            m_pUnit[p]->m_iFlag = 2;

        if (++p == m_iSize)
            p = 0;
    }

    if (!passack)
        m_iStartPos = (q + 1) % m_iSize;

    return len - rs;
}

在这个函数中主要完成了如下过程:

1. 执行scanMsg()找到下一个未读取的Msg相关的信息,若找不到则直接返回0,否则继续执行。

p为Msg的起始packet的index,q为结束packet的index。UDT的Msg与UDP的Msg有相似的地方,它们都是只能整体的读写,但又有非常大的不同,前者的长度可以非常长,而后者则要短许多。

2. 通过一个循环读取index范围在[p, q]的所有packet,循环每执行一轮,就读取一个packet,但这里总是整个整个packet的读。在循环体中,一个packet的读取过程大体为:

(1). 获取当前packet可读的数据的长度unitsize。

(2). 主要是根据rs值,也就是传入的数据缓冲区剩余长度值,来修正unitsize,以防止后面的memcpy操作越界。

(3). 在unitsize大于0时,执行memcpy将数据复制进传入的数据缓冲区。并更新数据缓冲区data的位置,和传入的数据缓冲区剩余长度值rs。

(4). 如果passack为false,则会将这个packet归还给CUnitQueue m_pUnitQueue;如果为true,则会将读取的CUnit的m_iFlag置为2。

(5). 跳到下一个CUnit/packet,即递增p。

由这段逻辑可见,如果传入的数据缓冲区空间不够大的话,那么会在把数据缓冲区塞满之后,将相同的Msg后面的所有数据直接丢弃。

3. 如果passack为false,则会更新m_iStartPos为(q + 1) % m_iSize。

4. 返回读入到数据缓冲区中的数据的长度。

这里再来看一下CRcvBuffer::scanMsg():

bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) {
    // empty buffer
    if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
        return false;

    //skip all bad msgs at the beginning
    while (m_iStartPos != m_iLastAckPos) {
        if (NULL == m_pUnit[m_iStartPos]) {
            if (++m_iStartPos == m_iSize)
                m_iStartPos = 0;
            continue;
        }

        if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1)) {
            bool good = true;

            // look ahead for the whole message
            for (int i = m_iStartPos; i != m_iLastAckPos;) {
                if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag)) {
                    good = false;
                    break;
                }

                if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
                    break;

                if (++i == m_iSize)
                    i = 0;
            }

            if (good)
                break;
        }

        CUnit* tmp = m_pUnit[m_iStartPos];
        m_pUnit[m_iStartPos] = NULL;
        tmp->m_iFlag = 0;
        --m_pUnitQueue->m_iCount;

        if (++m_iStartPos == m_iSize)
            m_iStartPos = 0;
    }

    p = -1;                  // message head
    q = m_iStartPos;         // message tail
    passack = m_iStartPos == m_iLastAckPos;
    bool found = false;

    // looking for the first message
    for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++i) {
        if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag)) {
            switch (m_pUnit[q]->m_Packet.getMsgBoundary()) {
                case 3:  // 11
                    p = q;
                    found = true;
                    break;

                case 2:  // 10
                    p = q;
                    break;

                case 1:  // 01
                    if (p != -1)
                        found = true;
                    break;
            }
        } else {
            // a hole in this message, not valid, restart search
            p = -1;
        }

        if (found) {
            // the msg has to be ack‘ed or it is allowed to read out of order, and was not read before
            if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
                break;

            found = false;
        }

        if (++q == m_iSize)
            q = 0;

        if (q == m_iLastAckPos)
            passack = true;
    }

    // no msg found
    if (!found) {
        // if the message is larger than the receiver buffer, return part of the message
        if ((p != -1) && ((q + 1) % m_iSize == p))
            found = true;
    }

    return found;
}

1. 在这个函数中会先检查一下是否是空buffer,若是则直接返回false,表示没有Msg可读,否则继续执行。

2. 通过一个两层循环来检查是否有一个Msg存在,在外层循环中主要来查找Msg的起始packet,而在内存循环中则主要用于查找结束packet。更具体的来看外层循环做的事情:

(1). 检查m_pUnit[m_iStartPos]是否为null,若为null,则递增m_iStartPos并进入下一轮循环,以跳过开头一些无效的packet,否则继续执行。

(2). 若m_iStartPos指向的packet已经被占用,同时它的MsgBoundary大于1,则表明这是一个Msg的起始packet。此时则先进入内层循环,从m_iStartPos开始至m_iLastAckPos结束,寻找Msg的结束packet。good变量用于表示是否找到了一个完整的Msg或一个中间没有洞但还未完全接收到的Msg。

在内层循环中,会检查一下当前CUnit是否为null或还未处于被占用状态,若是则表明Msg中有洞,因而将good置为false,直接跳出内层循环,否则继续执行内层循环后面的逻辑。然后检查当前CUnit的MsgBoundary值是否为1,或为3,若是则表明找到了Msg的结束packet,则直接跳出内层循环,否则继续执行内层循环后面的逻辑。最后递增循环计数器。

如果good为true,则跳出外层循环,否则继续执行。

关于MsgBoundary的判断,可以对照CSndBuffer::addBuffer()的code来看。

(3). 将m_iStartPos指向的当前CUnit还回给CUnitQueue m_pUnitQueue。

(4). 递增m_iStartPos。

内层循环会在一个Msg中有空洞时,将good置为false,并从内层循环中跳出。由此可见,在内层循环外,检查good值,发现good为false时就知道应该跳过并还回index范围在区间[m_iStartPos,i]内的所有CUnit了。但index范围在区间[m_iStartPos,i]内的所有CUnit却是由外层循环最后的那一段code来执行跳过和还回的。

总感觉那段查找一个Msg的code应该写成下面这样才比较好一点:

        if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() = 3)) {
            break;
        } else if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() = 2)) {
            bool good = true;

            // look ahead for the whole message
            for (int i = m_iStartPos + 1; i != m_iLastAckPos;) {
                if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag)) {
                    good = false;
                    break;
                }

                if (m_pUnit[i]->m_Packet.getMsgBoundary() == 2 || m_pUnit[i]->m_Packet.getMsgBoundary() == 3) {
                    good = false;
                    break;
                } else if (m_pUnit[i]->m_Packet.getMsgBoundary() == 1) {
                    break;
                }

                if (++i == m_iSize)
                    i = 0;
            }

            if (good)
                break;
        }

这种写法将单个packet Msg和多packet Msg的处理逻辑分开,明显要清晰许多。

3. 确定Msg的具体边界。

在前一个步骤中,已经有跳过所有的含空洞的Msg等无效Msg,但在这里依然做了是否为NULL的判断。

passack为true表示对于Msg的查找已经到了最后Ack的那个packet了。这段code似乎修改一下循环控制部分看起来会更清晰一点:

    for (q = m_iStartPos; q != m_iLastAckPos;) {

4. 如果没有找到Msg,同时(p != -1) && ((q + 1) % m_iSize == p),则表明一个Msg的packet把整个接收缓冲区都充爆了,还是没有消息的终止packet,则会让调用端先读取一部分。

对于接收缓冲区CRcvBuffer的研究就先到这里。

应用程序接收数据

来看下UDT都给应用程序提供了哪些API以用于接收数据。UDT提供了如下的几个函数用于不同方式的数据接收:

UDT_API int recv(UDTSOCKET u, char* buf, int len, int flags);

UDT_API int recvmsg(UDTSOCKET u, char* buf, int len);

UDT_API int64_t recvfile(UDTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = 7280000);

UDT_API int64_t recvfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block = 7280000);

recv()用于进行流式的数据接收。recvmsg()用于进行数据报式的数据接收。recvfile()和recvfile2()用来直接将流式接收的数据写入文件,两者的执行过程基本相同,仅有的区别就在于以不同的形式来提供文件参数。

recv

先来看一下UDT::recv()函数:

int CUDT::recv(char* data, int len) {
    if (UDT_DGRAM == m_iSockType)
        throw CUDTException(5, 10, 0);

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    if (len <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    if (0 == m_pRcvBuffer->getRcvDataSize()) {
        if (!m_bSynRecving)
            throw CUDTException(6, 2, 0);
        else {
#ifndef WIN32
            pthread_mutex_lock(&m_RecvDataLock);
            if (m_iRcvTimeOut < 0) {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                    pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
            } else {
                uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
                timespec locktime;

                locktime.tv_sec = exptime / 1000000;
                locktime.tv_nsec = (exptime % 1000000) * 1000;

                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) {
                    pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime);
                    if (CTimer::getTime() >= exptime)
                        break;
                }
            }
            pthread_mutex_unlock(&m_RecvDataLock);
#else
            if (m_iRcvTimeOut < 0)
            {
                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                WaitForSingleObject(m_RecvDataCond, INFINITE);
            }
            else
            {
                uint64_t enter_time = CTimer::getTime();

                while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
                {
                    int diff = int(CTimer::getTime() - enter_time) / 1000;
                    if (diff >= m_iRcvTimeOut)
                    break;
                    WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff ));
                }
            }
#endif
        }
    }

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    int res = m_pRcvBuffer->readBuffer(data, len);

    if (m_pRcvBuffer->getRcvDataSize() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    if ((res <= 0) && (m_iRcvTimeOut >= 0))
        throw CUDTException(6, 3, 0);

    return res;
}

int CUDT::recv(UDTSOCKET u, char* buf, int len, int) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recv(buf, len);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}

int recv(UDTSOCKET u, char* buf, int len, int flags) {
    return CUDT::recv(u, buf, len, flags);
}

这个API的实现结构与UDT::send()非常相似,此处不再赘述。直接来看CUDT::recv(char* data, int len)的执行:

1. 检查UDT Socket的类型是否为UDT_DGRAM,若是则直接抛异常退出。这个检查限定只有类型为UDT_STREAM的UDT Socket才能执行recv操作。

2. 检查CUDT的状态,若不处于有效的连接状态,或处于正在关闭状态且没有额外的数据可以读取会直接抛异常退出。这一步用于保证只有处于有效的连接状态,或正在关闭但有数据未读完的CUDT才能recv。

3. 检查参数len,也就是数据缓冲区参数的长度是否为一个有效的值,即大于0。若不是则没有必要执行后续的recv动作,直接返回0值退出。

4. 在接收数据大小为0,同时UDT Socket处于有效的连接状态时等待数据的到达。

case 1:UDT Socket处于非m_bSynRecving状态,则直接抛异常退出。

case 2:UDT Socket处于m_bSynRecving状态,m_iRcvTimeOut小于0,此时只要UDT Socket依然处于有效的连接状态,就无限制的等待。

case 3:UDT Socket处于m_bSynRecving状态,m_iRcvTimeOut大于等于0,此时则最长等待m_iRcvTimeOut ms。

5. 再次检查CUDT的状态,若不处于有效的连接状态,或处于正在关闭状态且没有额外的数据可以读取会直接抛异常退出。在这些条件下无需执行后面进一步的数据读取操作。

6. 执行m_pRcvBuffer->readBuffer(data, len)从接收缓冲区中读取数据。

7. 若没有读到数据,而且m_iRcvTimeOut则说明是读取超时了,则抛出异常退出,否则继续执行。

8. 返回读到的数据的长度。

recvmsg

然后来看UDT::recvmsg():

int CUDT::recvmsg(char* data, int len) {
    if (UDT_STREAM == m_iSockType)
        throw CUDTException(5, 9, 0);

    // throw an exception if not connected
    if (!m_bConnected)
        throw CUDTException(2, 2, 0);

    if (len <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    if (m_bBroken || m_bClosing) {
        int res = m_pRcvBuffer->readMsg(data, len);

        if (m_pRcvBuffer->getRcvMsgNum() <= 0) {
            // read is not available any more
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
        }

        if (0 == res)
            throw CUDTException(2, 1, 0);
        else
            return res;
    }

    if (!m_bSynRecving) {
        int res = m_pRcvBuffer->readMsg(data, len);
        if (0 == res)
            throw CUDTException(6, 2, 0);
        else
            return res;
    }

    int res = 0;
    bool timeout = false;

    do {
#ifndef WIN32
        pthread_mutex_lock(&m_RecvDataLock);

        if (m_iRcvTimeOut < 0) {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
                pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
        } else {
            uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
            timespec locktime;

            locktime.tv_sec = exptime / 1000000;
            locktime.tv_nsec = (exptime % 1000000) * 1000;

            if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
                timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);
        }
        pthread_mutex_unlock(&m_RecvDataLock);
#else
        if (m_iRcvTimeOut < 0)
        {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
            WaitForSingleObject(m_RecvDataCond, INFINITE);
        }
        else
        {
            if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
            timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);
        }
#endif

        if (m_bBroken || m_bClosing)
            throw CUDTException(2, 1, 0);
        else if (!m_bConnected)
            throw CUDTException(2, 2, 0);
    } while ((0 == res) && !timeout);

    if (m_pRcvBuffer->getRcvMsgNum() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    if ((res <= 0) && (m_iRcvTimeOut >= 0))
        throw CUDTException(6, 3, 0);

    return res;
}

int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recvmsg(buf, len);
    } catch (CUDTException e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}

int recvmsg(UDTSOCKET u, char* buf, int len) {
    return CUDT::recvmsg(u, buf, len);
}

这个API的实现结构与UDT::recv()非常相似,此处不再赘述。直接来看CUDT::recvmsg(char* data, int len)的执行:

1. 检查UDT Socket的类型是否为UDT_STREAM,若是则直接抛异常退出。这个检查限定只有类型为UDT_DGRAM的UDT Socket才能执行recvmsg操作。

2. 处理UDT Socket处于非Connected的情况。处理方法是,直接抛异常退出。

3. 检查参数len,也就是数据缓冲区参数的长度是否为一个有效的值,即大于0。若不是则没有必要执行后续的recvmsg动作,直接返回0值退出。

4. 处理UDT Socket依然处于Connected状态,但同时已经在Closing或被Broken时的情况。处理方法即是,尝试读取一个Msg。若读取到了数据,就返回读取到的数据的长度;若没有读取到就抛出一个异常退出。

5. 处理UDT Socket处于有效的连接状态,但是不是SynRecving模式的情况。处理方法即是,尝试读取一个Msg。若读取到了数据,就返回读取到的数据的长度;若没有读取到就抛出一个异常退出。

6. 处理UDT Socket处于有效的连接状态,且是SynRecving模式的情况。这种情况会在读取不到数据时进行等待,然后再尝试读取。这里用了一个do-while循环来实现等待-读取的逻辑。分两种情况来处理,一是m_iRcvTimeOut小于0,即只要UDT Socket处于有效的连接状态时就无限制等待的情况;二是m_iRcvTimeOut大于等于0,即UDT Socket处于有效的连接状态的情况下等待一定时间的情况。

在循环体的最后还会处理由于UDT Socket状态改变而导致等待过程被终止的情况。

循环会在没有接收到数据同时没有超时的情况下一直等待。循环体即将结束时对于状态的检查使得状态的改变能够终结等待过程,而不会出现死循环。

7. 循环退出之后,发现是由于timeout而结束的情况就抛个异常退出。

8. 一切顺利读取了Msg的数据,则返回读取到的数据的长度。

这个函数做的事情与UDT::recv()做的事情极其相似,但是写法的差别看上去比较大。不同写法个有好坏吧。

recvfile和recvfile2

接着再来看一下recvfile和recvfile2:

int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block) {
    if (UDT_DGRAM == m_iSockType)
        throw CUDTException(5, 10, 0);

    if (!m_bConnected)
        throw CUDTException(2, 2, 0);
    else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
        throw CUDTException(2, 1, 0);

    if (size <= 0)
        return 0;

    CGuard recvguard(m_RecvLock);

    int64_t torecv = size;
    int unitsize = block;
    int recvsize;

    // positioning...
    try {
        ofs.seekp((streamoff) offset);
    } catch (...) {
        throw CUDTException(4, 3);
    }

    // receiving... "recvfile" is always blocking
    while (torecv > 0) {
        if (ofs.fail()) {
            // send the sender a signal so it will not be blocked forever
            int32_t err_code = CUDTException::EFILE;
            sendCtrl(8, &err_code);

            throw CUDTException(4, 4);
        }

#ifndef WIN32
        pthread_mutex_lock(&m_RecvDataLock);
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
            pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
        pthread_mutex_unlock(&m_RecvDataLock);
#else
        while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize()))
        WaitForSingleObject(m_RecvDataCond, INFINITE);
#endif

        if (!m_bConnected)
            throw CUDTException(2, 2, 0);
        else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
            throw CUDTException(2, 1, 0);

        unitsize = int((torecv >= block) ? block : torecv);
        recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);

        if (recvsize > 0) {
            torecv -= recvsize;
            offset += recvsize;
        }
    }

    if (m_pRcvBuffer->getRcvDataSize() <= 0) {
        // read is not available any more
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
    }

    return size - torecv;
}

int64_t CUDT::recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block) {
    try {
        CUDT* udt = s_UDTUnited.lookup(u);
        return udt->recvfile(ofs, offset, size, block);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}

int64_t recvfile(UDTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block) {
    return CUDT::recvfile(u, ofs, offset, size, block);
}

int64_t recvfile2(UDTSOCKET u, const char* path, int64_t* offset, int64_t size, int block) {
    fstream ofs(path, ios::binary | ios::out);
    int64_t ret = CUDT::recvfile(u, ofs, *offset, size, block);
    ofs.close();
    return ret;
}

这个API的实现结构与UDT::recv()和CUDT::recvmsg()非常相似,此处不再赘述。直接来看CUDT::recvfile(fstream& ofs, int64_t& offset, int64_t size, int block)的执行:

1. 与UDT::recv()一样,确认UDT Socket的类型为UDT_STREAM。

2. 检查CUDT的状态,处理方式也与UDT::recv()的对应部分一样。

3. 检查参数size,也就是要读取的数据长度为一个有效值,即大于0。若不是则没有必要执行后续的recvfile动作,直接返回0值退出。

4. seek到文件的目的offset,若失败就抛出异常退出。

5. 通过一个循环不停地读取文件,直到读取了期望的数据大小。在循环体中:

(1). 首先检查ofs是否处于fail状态,若是就向发送端发送一个Error消息,否则继续执行。

(2). 在接收缓冲区中无数据可读时等待,而且是不顾及m_iRcvTimeOut及m_bSynRecving模式的无限制等待,直到UDT Socket不再处于有效的连接状态。

(3). 检查UDT Socket的状态,是否不再处于有效的连接状态且没有其它数据可以读了,若是就抛异常退出。

(4). 通过torecv和block值计算本次读操作应该读取的数据的大小。

(5). 由接收缓冲区向文件读取数据。

(6). 读到了数据时更新torecv及偏移两offset。

这个循环仅有的退出条件就是一些异常的情况,ofs fail,或者UDT Socket不再处于有效的连接状态。

6. 返回实际读取到的数据的长度。当然这个地方的计算过程略显莫名其妙,就前面读取数据的循环而言,在读取到请求的数据长度之前貌似是不会结束的。

应用这个函数所需要的条件与UDT::recv()比较相似。但执行的操作又差别比较大,在这个函数中貌似就是一定要读到请求大小的数据才会结束。

Done。

时间: 2024-10-25 22:20:26

UDT协议实现分析——数据的接收的相关文章

UDT协议实现分析——数据的发送

连接建立起来之后,我们就可以通过UDT Socket进行数据的收发了.先来看用来发送数据的几个函数.UDT提供了如下的几个函数用于不同目的下的数据发送: UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); UDT_API i

UDT协议实现分析——数据发送控制

在前文中,我们有看到,数据发送的过程,大体是发送者CUDT将要发送的数据放进它的CSndBuffer m_pSndBuffer,并将它自己添加进它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆里,后面CSndQueue m_pSndQueue的worker线程会通过CSndUList::pop()从CSndUList m_pSndUList的堆顶CUDT中获取一个要发送的包来发送,包的获取主要是通过CUDT::packData()来完成,而这个函数正

UDT协议实现分析——UDT Socket的创建

UDT API的用法 在分析 连接的建立过程 之前,先来看一下UDT API的用法.在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接:有一个或多个客户端连接UDT Server:UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信. 先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下. #include <unistd.h>

UDT协议实现分析——bind、listen与accept

UDT Server启动之后,基于UDT协议的UDP数据可靠传输才成为可能,因而接下来分析与UDT Server有关的几个主要API的实现,来了解下UDT Server是如何listening在特定UDP端口上的.主要有UDT::bind(),UDT::listen()和UDT::accept()等几个函数. bind过程 通常UDT Server在创建UDT Socket之后,首先就要调用UDT::bind(),与一个特定的本地UDP端口地址进行绑定,以便可以在希望的端口上监听.这里来看一下U

UDT协议实现分析——连接的建立

UDT Server在执行UDT::listen()之后,就可以接受其它节点的连接请求了.这里我们研究一下UDT连接建立的过程. 连接的发起 来看连接的发起方.如前面我们看到的那样,UDT Client创建一个Socket,可以将该Socket绑定到某个端口,也可以不绑定,然后就可以调用UDT::connect()将这个Socket连接到UDT Server了.来看UDT::connect()的定义(src/api.cpp): int CUDTUnited::connect(const UDTS

UDT协议实现分析——UDT数据收发的可靠性保障

不管是数据的发送还是数据的接收,大体的流程我们基本上是都理了一下,还分析了数据收发过程中用的数据结构,接下来就看一些UDT中数据收发更精细的一些控制. UDT数据收发的可靠性保障 来看一下UDT中数据收发的可靠性保障. 接收包丢失列表CRcvLossList 先来看一下CRcvLossList的定义: class CRcvLossList { public: CRcvLossList(int size = 1024); ~CRcvLossList(); // Functionality: //

UDT协议实现分析——UDT初始化和销毁

UDT协议是一个用于在告诉Internet上传输大量数据的基于UDP的可靠传输协议. 我们可以将UDT协议的实现看作一个比较复杂的状态机.更准确的说,是一个主状态机,外加多个子状态机.主状态机是指协议实现中全局唯一.全局共享的状态与数据结构,主要对应于CUDTUnited类.子状态机则是对于一次UDT连接或一个Listening的UDT Server的抽象,是UDT自己创建的Socket抽象,一个与系统socket相似但又不同的概念,主要对应于CUDTSocket和CUDT类.UDT的Socke

UDT协议实现分析——发送窗口大小及发送速率的调整

UDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用,并使网络中数据传输的速率尽可能快. 如我们前面在分析数据发送的控制中看到的,对于正常的顺序packet发送,发送控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由拥塞窗口大小m_dCongestionWindow和滑动窗口大小m_iFlowWindowSize来描述,发送窗口大小为两者中较小的那一个:二是控制两个数据包发送的时间间隔,也就是包的发送速率,

UDT协议实现分析总结

UDT的整体结构 UDT Socket是UDT中的核心,同时它也是一座桥梁,它将UDT的使用者应用程序与内部实现部分对于数据结构的管理.网络数据的传输连接起来. 应用程序通过它将数据放进发送缓冲待发送,或者借由它来获取从网络接收数据.而与网络进行交互的部分,则从它那里拿到要发送的数据进行发送,或者在收到packet时将packet dispatch给它. UDT的数据接收部分框架: UDT的数据发送部分框架: UDT的一些问题 1. 接口的合理性. 分析了UDT的这许多code,给人的感觉就是,