除了发送函数以外,发送缓冲区对象还会响应来自网络的on_ack和on_nack消息,这两个消息分别是处理正常的状态报告和丢包情况下的网络报告。如果收到on_ack,缓冲区对象会把已经接收端报告过来的报文ID全部从发送窗口中删除,然后调用attempt_send尝试新的块发送。如果收到的是on_nack,表示对端有丢包,则先会记录丢包的ID到loss_set中,再调用on_ack进行处理。
触发attempt_send还有可能是定时器Timer,定时器每5MS会检查一下发送缓冲区,并调用attempt_send尝试发送并且会检查缓冲区是否可写。
attempt_send函数伪代码如下:
void RUDPSendBuffer::attempt_send(uint64_t now_timer) { uint32_t cwnd_size = send_window_.size(); uint32_t rtt = ccc_->get_rtt(); uint32_t ccc_cwnd_size = ccc_->get_send_window_size(); RUDPSendSegment* seg = NULL; uint32_t send_packet_number = 0; if(!loss_set_.empty()) //重发丢失的片段 { //发送丢包队列中的报文 uint64_t loss_last_ts = 0; uint64_t loss_last_seq = 0; for(LossIDSet::iterator it = loss_set_.begin(); it != loss_set_.end();) //检查丢失报文是否要重发 { if(send_packet_number >= ccc_cwnd_size) //超过发送窗口 break; SendWindowMap::iterator cwnd_it = send_window_.find(*it); if(cwnd_it != send_window_.end() && cwnd_it->second->last_send_ts_ + rtt < now_timer) //丢失报文必须在窗口中 { seg = cwnd_it->second; //UDP网络发送 net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer); if(cwnd_max_seq_ < seg->seq_) cwnd_max_seq_ = seg->seq_; //判断是否可以更改TS if(loss_last_ts < seg->last_send_ts_) { loss_last_ts = seg->last_send_ts_; if(loss_last_seq < *it) loss_last_seq = *it; } seg->last_send_ts_ = now_timer; seg->send_count_ ++; send_packet_number ++; loss_set_.erase(it ++); //报告CCC有重发 ccc_->add_resend(); } else ++ it; } //更新重发包范围内未重发报文的时刻,防止下一次定时器到来时重复发送 for(SendWindowMap::iterator it = send_window_.begin(); it != send_window_.end(); ++it) { if(it->second->push_ts_ < loss_last_ts && loss_last_seq >= it->first) it->second->last_send_ts_ = now_timer; else if(loss_last_seq < it->first) break; } } else if(send_window_.size() > 0)//丢包队列为空,重发所有窗口中超时的分片 { //发送间时间隔阈值 uint32_t rtt_threshold = (uint32_t)ceil(rtt * 1.25); rtt_threshold = (core_max(rtt_threshold, 30)); SendWindowMap::iterator end_it = send_window_.end(); for(SendWindowMap::iterator it = send_window_.begin(); it != end_it; ++it) { if(send_packet_number >= ccc_cwnd_size || (it->second->push_ts_ + rtt_threshold > now_timer)) break; seg = it->second; //重发块的触发条件是上一次发送的时间距离现在大于特定的阈值或者压入时间很长并且是属于发送缓冲区靠前的块 if(seg->last_send_ts_ + rtt_threshold < now_timer || (seg->push_ts_ + rtt_threshold * 5 < now_timer && seg->seq_ < dest_max_seq_ + 3 && seg->last_send_ts_ + rtt_threshold / 2 < now_timer)) { net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer); if(cwnd_max_seq_ < seg->seq_) cwnd_max_seq_ = seg->seq_; seg->last_send_ts_ = now_timer; seg->send_count_ ++; send_packet_number ++; //报告CCC有重发块 ccc_->add_resend(); } } } //判断是否可以发送新的报文 if(ccc_cwnd_size > send_packet_number) { while(!send_data_.empty()) { RUDPSendSegment* seg = send_data_.front(); //判断NAGLE算法,NAGLE最少需要在100MS凑1024个字节报文 if(cwnd_size > 0 && nagle_ && seg->push_ts_ + NAGLE_DELAY > now_timer && seg->data_size_ < MAX_SEGMENT_SIZE - 256) break; //判断发送窗口 if(cwnd_size < ccc_cwnd_size) { send_data_.pop_front(); send_window_.insert(SendWindowMap::value_type(seg->seq_, seg)); cwnd_size ++; seg->push_ts_ = now_timer; seg->last_send_ts_ = now_timer; seg->send_count_ = 1; //UDP网络发送 net_channel_->send_data(0, seg->seq_, seg->data_, seg->data_size_, now_timer); if(cwnd_max_seq_ < seg->seq_) cwnd_max_seq_ = seg->seq_; } else //发送窗口满,则停止发送 break; } } }
从上可得知,attempt_send是首先检查是否可以发送丢失的报文,然后再检查窗口中太老的报文是否要重发,最后才加入新的发送报文。所有的前提约束是不超过发送窗口。这个函数里CCC决定的发送窗口大小和RTT直接控制着发送速度和发送策略。 在这里值得一提的是NAGLE的实现,RUDP为了防止小包过多,实现了一个nagle算法,如果设置了此开关,假如只有1个块在缓冲队列中,会等数据达到1024的长度才进行发送。如果等100MS没到1024长度也会发送,也就是最大等100MS.开关可以通过rudp interface设置的。
接收缓冲区
时间: 2024-10-08 12:43:22