总结下网络编程中关键的细节问题,包含连接建立、连接断开、消息到达、发送消息等等;
连接建立
包括服务端接受 (accept) 新连接和客户端成功发起 (connect) 连接。 accept接受连接的问题在本文最后会聊到,这里谈谈connect的关键点; 使用非阻塞连接建立需要注意: connect/select返回后,可能没有连接上;需要再次确认是否成功连接;
步骤为:
- 使用异步connect直接连接一次,因为使用了非阻塞,函数立刻返回;
- 检查返回值,为0成功连接,否则加入到select/epoll中监控;
- 当有写事件时,连接成功;当即可读又可写时,可能是有错误或者连接成功后有数据已经发过来;所以,此时,需要用getsockopt()读取socket的错误选项,二次确认是否真的连接成功:
Fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return(-1);
/* Do whatever we want while the connect is taking place. */
if (n == 0)
goto done; /* connect completed immediately */
if ( (n = Select(sockfd+1, &rset, &wset, NULL,
nsec ? &tval : NULL)) == 0) {
close(sockfd); /* timeout */
errno = ETIMEDOUT;
return(-1);
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
//二次确认是否真的连接成功
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return(-1); /* Solaris pending error */
} else
err_quit("select error: sockfd not set");
连接断开
包括主动断开 (close 或 shutdown) 和被动断开 (read 返回 0)。
当打算关闭网络连接时,如何能知道对方已经发送了数据自己还没有收到? 在TCP层面解决:主动关闭的时候只使用半关闭shutdown(), 这样,服务端这边之时关闭了写端,还可以正常读;客户端收到关闭的信号后(read返回0),会再调用shutdown关闭整个连接; 在应用层面解决:双方通过某个标记协商,在标记之后不再读写数据,这样就可以完全的关闭连接了;
关闭连接时需要注意的: 是否还有未发送的数据,需要保证应用缓冲区中的数据都发送完毕之后再关闭缓冲区; TCP缓存区不用我们考虑,因为在调用shutdown或close的时候,TCP的实现是会将TCP的发送缓冲区中的数据都发送出去,然后再发送FIN报文(也可能是组合成一个报文发送);
消息到达
消息到达是最重要的事件;对它的处理决定了网络编程的风格:是阻塞还是非阻塞、分包的处理、应用层的缓冲如何设计等等;
处理分包
所谓分包,就是在一个个字节流消息中如何区分出一个个消息来; 常见的分包方法有:
- 固定长度;
- 特殊的结尾符,比如字符串的\0,或者回车换行等;
- 固定的消息头中指定后续的消息的长度,然后跟上一个消息体内容;
- 使用协议本身的格式,比如json格式头尾配对(XML也一样);
字节序转换注意字节对齐
如果传输的是二进制类型,在字节流的缓存区中直接强转可能core dump;因为有的系统访问地址需要字节对齐,不能在任意地址上访问二进制类型(如整形),合理的方式是将其copy到一个本地变量中,然后再做字节序的转换:
int32_t peekInt32() const
{
assert(readableBytes() >= sizeof(int32_t));
int32_t be32 = 0;
::memcpy(&be32,readerIndex_, sizeof(be32) );
return be32toh(be32);
}
应用层缓存区的实现
数据到达时处理需要注意: socket读事件来到,必须一次将所有的数据都读完,否则会造成一直有可读事件,造成busy-loop;读到的数据当然就需要有个应用层的缓冲区来存放; 因为应用的缓存区是有限的,可以默认设置一个大小,比如2kb,或者根本就不设置初始大小,用多少分配多少;muduo中使用的是vector 来作为缓存区,可以动态增长;
muduo buffer使用的技巧: buffe采用了vector自动增长的数据结构; 从系统内核中调用的时候,在应用层需要有足够大的缓冲区,最好能一次将系统recv到的缓冲区给读空,一次系统调用就搞定一切事情; 而应用缓冲区考虑到有很多个并发的可能,针对每个连接一次都分配较大的缓冲区浪费严重,陈硕推荐使用readv一次读入到两个地址中,首先将第一个地址填满,如果还有更多数据,就写入到临时缓冲区中,然后append到应用缓冲区;
读的时候使用readv,局部使用一个足够大的额外空间(64KB),这样,一次读取就足以将socket中的缓存区读空(一般不会超过64K,tcp buffer如果确实要设置大的缓存区,需要调整系统参数);如果数据不多,可能内部buffer就装下了,没有额外操作,否则,多的数据读到了外部的缓存区,再append到内部缓存区:
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
// saved an ioctl()/FIONREAD call to tell how much to read
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don‘t read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
//只有一次系统调用:这里的实现比较巧妙
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
// if (n == writable + sizeof extrabuf)
// {
// goto line_30;
// }
return n;
}
发送消息
网络编程中数据发送比数据接受要难处理; 数据的接收,只需要peek足够的数据后,就可以从应用缓冲区接收出来,然后处理;而数据的发送,还需要考虑对方接受缓慢的情况,导致tcp发送缓冲区累积,最终导致应用缓冲区累积;
举个例子:某客户端对echo服务器只发送,但故意不接收; 客户端如果只是发送,但从不接收的话,那么这边发送过去的报文,首先会导致客户端的tcp接收缓冲区满,然后通过ack报文告诉服务器端,这边的滑动窗口为0了,不能再发了;后续客户端发送的报文就把服务器端TCP发送缓冲区积满,然后累积应用层的发送缓冲区(因为是非阻塞),最终导致服务端的应用缓存区满或者内存撑爆;
需要发送数据的时候,优先直接调用write()发送,如果发送不成功,或没有全部发送完毕,才加入到发送缓存区,等待可写事件到来后发送; 直接调用write()发送数据时,需要先将本次需要发送的数据添加到缓存区,然后发送缓存区,不可直接发送本次数据(因为缓存区中可能有遗留的数据未发送完)
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
//注意,这里只调用了一次write,而没有反复调用write直到出现EAGAIN错误,
//原因是如果第一次调用没有发送完全部的数据,第二次调用几乎肯定是EAGAIN错误,
//因此这里减少了一次系统调用,这么做不影响正确性,却能够降低系统时延
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
//如果发送缓存区为空,不再关注写事件,避免 busy loop
channel_->disableWriting();
//如果还有写完成之后的回调,加入待执行回调队列
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
//如果此时正在关闭,调用shutdownInLoop 继续执行关闭过程
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
消息发送完毕
对于低流量的服务,可以不必关心这个事件;另外,这里“发送完毕”是指将数据写入操作系统的缓冲区,后续由 TCP 协议栈负责数据的发送与重传,不代表对方已经收到数据。
其它问题
IO multiplexing 是否可以配合阻塞套接字使用?
一般都配合非阻塞socket使用,如果使用阻塞IO,可能在读写事件上阻塞当前线程,造成无法继续处理已经就绪的事件; 初学网络编程可能都会有这个想法,select返回后,如果是读事件,那么这时候tcp读缓冲区肯定是有数据,这时即使使用阻塞套接字来read,应该也不会阻塞;但这样忽略了一个点,缓冲区确实是有数据,但是很可能到达的数据并不满足你要求读的数据大小,这样read调用还是会阻塞,直到有足够的数据才返回; 那么,对于数据读不可以,对accept()总可以吧,连接事件返回,一般都是有新用户接入,这时候阻塞的accept()应该总是能够返回;但在某些情况下,可能对方刚连接上就断开了,并给服务端发送了一个RST请求,造成服务端这边将已经就绪的连接请求又移除了,这样的场景下,select返回,但是accept却无法获取新的连接,造成阻塞,直到下一个连接请求到来;(这方面的例子详见《UNIX网络编程卷1:套接字联网API》16.6节非阻塞accept() ) 所以任何时候,IO multiplexing都需要配合非阻塞IO使用;
零拷贝的实现
对于内核层的实现,底层调用的是系统调用sendFile()方法; zerocopy技术省去了将操作系统的read buffer拷贝到程序的buffer, 以及从程序buffer拷贝到socket buffer的步骤, 直接将 read buffer 拷贝到 socket buffer; 详见:http://www.cnblogs.com/zemliu/p/3695549.html
应用层上的实现,对于自定义的结构,一般是交换内部指针(使用C++11,可以使用move操作来实现高效交换结构体) 如果是vector等结构,使用其成员函数swap()就能达到高效的交换(类似C++11中的move操作); 例如muduo中buffer实现:通过swap实现了缓存区的指针交换,从而达到数据交换的目的,而不用拷贝缓冲区;
void swap(Buffer& rhs)
{
buffer_.swap(rhs.buffer_); // std::vector<char> buffer_;
std::swap(readerIndex_, rhs.readerIndex_);
std::swap(writerIndex_, rhs.writerIndex_);
}
epoll使用LT
epoll使用是LT而非ET,原因如下:
- LT编程方便,select的经验都可同样适用;
- 读的时候只需要一次系统调用,而ET必须读到EAGAIN错误;减少一次系统调用,降低时延;
一般认为 edge-trigger 模式的优势在于能够减少 epoll 相关系统调用,这话不假,但网络服务程序里可不是只有 epoll 相关系统调用,为了绕过饿死问题,edge-trigger 模式下用户要自行进行 read/write 循环处理,这其中增加的系统调用和减少的 epoll 系统调用加起来,总体性能收益究竟如何?只有实际测量才知道,无法一概而论。为了降低处理逻辑复杂度,常用的事件处理库大部分都选择了 level-trigger 模式(如 libevent、boost::asio、muduo等)