接收缓冲区相对比较简单,其主要功能是接收发送方的数据并生成接收块、块排序、丢包判断和反馈、读事件通知等。以下是接收缓冲区的定义:
class RUDPRecvBuffer { public: ... //来自网络中的数据 int32_t on_data(uint64_t seq, const uint8_t* data, int32_t data_size); //定时事件 void on_timer(uint64_t now_timer, uint32_t rtc); //读取BUFFER中的数据 int32_t read(uint8_t* data, int32_t data_size); //检查缓冲区是否可读 void check_buffer(); //检查丢包 bool check_loss(uint64_t now_timer, uint32_t rtc); ... protected: IRUDPNetChannel* net_channel_; //接收窗口 RecvWindowMap recv_window_; //已完成的连续数据片 RecvDataList recv_data_; //丢包序列 LossIDTSMap loss_map_; //当前BUFFER中最大连续数据片的SEQ uint64_t first_seq_; //当期BUFFER中受到的最大的数据片ID uint64_t max_seq_; //最后一次发送ACK的时刻 uint64_t last_ack_ts_; //在上次发送ACK到现在,受到新的连续报文的标志 bool recv_new_packet_; ... };
在上面定义中,核心的函数主要是on_data和on_timer。on_data是接收来自发送端的RUDP数据报文,在这个函数里面首先会进行接收到报文和缓冲去里面的报文进行比较判断是否丢包和重复包。如果有丢包,记录到loss_map中。如果是重复包,则丢弃。如果接收到的包和缓冲区里的报文可以组成连续的块序列。则对上层触发on_read读事件。一下是这个函数的伪代码:
int32_t RUDPRecvBuffer::on_data(uint64_t seq, const uint8_t* data, int32_t data_size) { //报文合法性检测 if(seq > first_seq_ + MAX_SEQ_INTNAL || data_size > MAX_SEGMENT_SIZE) { //报告异常 RUDP_RECV_DEBUG("on data exception!!"); net_channel_->on_exception(); return -1; } RUDPRecvSegment* seg = NULL; if(first_seq_ + 1 == seq)//连续报文 { recv_new_packet_= true; //将数据缓冲到队列中 GAIN_RECV_SEG(seg); seg->seq_ = seq; seg->data_size_ = data_size; memcpy(seg->data_, data, data_size); recv_data_.push_back(seg); first_seq_ = seq; //判断缓冲区中的块是否连续,并进行排序 check_recv_window(); //触发可读事件 net_channel_->on_read(); //删除丢包 loss_map_.erase(seq); } else if(seq > first_seq_ + 1) //非连续报文 { RecvWindowMap::iterator it = recv_window_.find(seq); if(it == recv_window_.end()) //记录到接收窗口中 { //将数据缓冲到队列中 GAIN_RECV_SEG(seg); seg->seq_ = seq; seg->data_size_ = data_size; memcpy(seg->data_, data, data_size); recv_window_[seq] = seg; } //判断丢包 if(seq > max_seq_ + 1) { uint64_t ts = CBaseTimeValue::get_time_value().msec(); for(uint64_t i = max_seq_ + 1; i < seq; ++ i) //缓冲区中最大的报文和收到的报文之间的报文全部列入丢包范围中,并记录丢包时刻 loss_map_[i] = ts; } else { //删除丢包 loss_map_.erase(seq); } } //更新缓冲区最大SEQ if(max_seq_ < seq) max_seq_ = seq; return 0; }
on_timer是定时触发的,一般是5MS触发一次。主要是向发送端发送报告消息(ack/nack)、检查缓冲区是否可读两个操作。发送ack状态消息的条件是
uint32_t rtc_threshold = core_min(20, rtc / 2);
发送ack
其中rtc是RTT的修正值,由CCC计算得来。间隔不大于20MS发送一次。recv_new_packet_是一个收到正常连续报文的标志。如果发送了NACK,就不发送ACK,如果有丢包的话,就会触发发送nack,在on_timer的时候就会检测是本定时周期是否有丢包,如果有,就将丢包的序号通过nack发往发送端做丢包补偿。
void RUDPRecvBuffer::on_timer(uint64_t now_timer, uint32_t rtc) { //检查丢包 if(check_loss(now_timer, rtc)) recv_new_packet_ = false; //检查是否需要发送ack uint32_t rtc_threshold = core_min(20, rtc / 2); if(last_ack_ts_ + rtc_threshold <= now_timer && recv_new_packet_) send_ack(); //检查缓冲区是否可读 if(!recv_data_.empty() && net_channel_ != NULL) net_channel_->on_read(); }
CCC核心控制
CCC的核心控制就是慢启动、快恢复、RTT评估三个部分。
慢启动过程描述如下:
1、发送端的初始化发送窗口(send_wnd)为16
2、当发送端收到第一个ACK时,send_wnd = send_wnd + (本ACK周期内发送成功的报文数量)
时间: 2024-10-05 11:41:43