TcpConnection是使用shared_ptr来管理的类,因为它的生命周期模糊。TcpConnection表示已经建立或正在建立的连接,状态只有kConnecting、kConnected、kDisconnected、kDisconnecting,它初始化时,构造函数的sockfd表示正在建立连接kConnecting。
建立连接后,用户只需处理收发数据,发送数据它会自动处理,收取数据后,会调用用户设置的MessageCallback函数来处理收到的数据。
TcpConnection中封装了inputBuffer和outputBuffer,用来表示应用层的缓冲。在发送数据时,如果不能一次将呕吐Buffer中的数据发送完毕,它还会enable channel中的wirte事件,当sockfd可写时,会再次发送。
高水位回调和低水位回调:
在发送数据时,如果发送过快会造成数据在本地积累。muduo解决这个问题的办法是用了高水位回调和低水位回调,分别用函数HighWaterMarkCallback和WriteCompleteCallback代表。原理为:设置一个发送缓冲区的上限值,如果大于这个上限值,停止接收数据;WriteCompleteCallback函数为发送缓冲区为空时调用,在这个函数重启开启接收数据。
调用send时,可能不是TcpConnection所属的IO线程,这是通过loop_->runInLoop可以轮转到其所属的IO线程。因为TcpConnection中保存了其所属EventLoop的指针,可以通过EventLoop::runInLoop将所调用的函数添加到所属EventLoop的任务队列中。
断开连接:
TcpConnection的断开是采用被动方式,即对方先关闭连接,本地read(2)返回0后,调用顺序如下:
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。
当建立连接后,TcpServer中的map持有TcpConnection的shared_ptr指针,因此TcpConnection在被移除map前不会析构,其shared_ptr计数器不小于1。在TcpConnection内部使用其shared_ptr时会调用shared_from_this()来获取。当map移除shared_ptr指针后,如果用户不持有TcpConnection的shared_ptr指针,那么在调用TcpConnection::connectDestroyed()后,TcpConnection会自动销毁。
setTcpNoDelay表示使用/不使用Nagle算法。禁用Nagle算法可以避免连续发包出现延迟,适用于低延迟的网络服务。
TcpConnection.h
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();
EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
bool disconnected() const { return state_ == kDisconnected; }
// return true if success.
bool getTcpInfo(struct tcp_info*) const;
string getTcpInfoString() const;
// void send(string&& message); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message); // this one will swap data
void shutdown(); // NOT thread safe, no simultaneous calling
// void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
void forceClose();
void forceCloseWithDelay(double seconds);
void setTcpNoDelay(bool on);//是否使用Nagle算法
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb)//writeCompleteCallback_和highWaterMarckCallback_对应,解决发送过快问题
{ writeCompleteCallback_ = cb; }
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
/// Advanced interface
Buffer* inputBuffer()
{ return &inputBuffer_; }
Buffer* outputBuffer()
{ return &outputBuffer_; }
/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }
// called when TcpServer accepts a new connection
void connectEstablished(); // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed(); // should be called only once
private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
// void sendInLoop(string&& message);
void sendInLoop(const StringPiece& message);
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
// void shutdownAndForceCloseInLoop(double seconds);
void forceCloseInLoop();
void setState(StateE s) { state_ = s; }
const char* stateToString() const;
EventLoop* loop_;
const string name_;
StateE state_; // FIXME: use atomic variable
// we don‘t expose those classes to client.
boost::scoped_ptr<Socket> socket_;//封装的socket
boost::scoped_ptr<Channel> channel_;//封装的channel
const InetAddress localAddr_;
const InetAddress peerAddr_;//远程端的信息
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;//用户自定义的,消息到达时怎么处理消息
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;//发送缓冲区数据“上限阀值”,超过这个值
Buffer inputBuffer_;//输入Buffer
Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
boost::any context_;
// FIXME: creationTime_, lastReceiveTime_
// bytesReceived_, bytesSent_
};
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;
TcpConnection.cc
//默认的几个回调函数
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
// do not call conn->forceClose(), because some users want to register message callback only.
}
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
Buffer* buf,
Timestamp)
{
buf->retrieveAll();
}
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,//这里的fd没有封装
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);//使用
}
TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
<< " fd=" << channel_->fd()
<< " state=" << stateToString();
assert(state_ == kDisconnected);
}
bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const//获取当前连接状态
{
return socket_->getTcpInfo(tcpi);
}
string TcpConnection::getTcpInfoString() const
{
char buf[1024];
buf[0] = ‘\0‘;
socket_->getTcpInfoString(buf, sizeof buf);
return buf;
}
void TcpConnection::send(const void* data, int len)
{
send(StringPiece(static_cast<const char*>(data), len));
}
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())//如果是其OwnerIO线程,则发送
{
sendInLoop(message);
}
else//否则轮转到其Owner IO线程
{
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this, // FIXME
message.as_string()));
//std::forward<string>(message)));
}
}
}
// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this, // FIXME
buf->retrieveAllAsString()));
//std::forward<string>(message)));
}
}
}
void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)//Buffer中没数据,可以直接发送
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)//将缓存发送完毕,调用回调函数
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)//还有数据可以发送
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)//缓冲区数据太多,
{
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);//将数据放到outBuffer中
if (!channel_->isWriting())
{
channel_->enableWriting();//Buffer中有数据,开始writeable事件
}
}
}
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())//正在发送时不关闭
{
// we are not writing
socket_->shutdownWrite();
}
}
// void TcpConnection::shutdownAndForceCloseAfter(double seconds)
// {
// // FIXME: use compare and swap
// if (state_ == kConnected)
// {
// setState(kDisconnecting);
// loop_->runInLoop(boost::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
// }
// }
// void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
// {
// loop_->assertInLoopThread();
// if (!channel_->isWriting())
// {
// // we are not writing
// socket_->shutdownWrite();
// }
// loop_->runAfter(
// seconds,
// makeWeakCallback(shared_from_this(),
// &TcpConnection::forceCloseInLoop));
// }
void TcpConnection::forceClose()//关闭Connection
{
// FIXME: use compare and swap
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->queueInLoop(boost::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
}
}
void TcpConnection::forceCloseWithDelay(double seconds)//延迟关闭,使用了定时器
{
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->runAfter(
seconds,
makeWeakCallback(shared_from_this(),
&TcpConnection::forceClose)); // not forceCloseInLoop to avoid race condition
}
}
void TcpConnection::forceCloseInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnected || state_ == kDisconnecting)
{
// as if we received 0 byte in handleRead();
handleClose();
}
}
const char* TcpConnection::stateToString() const
{
switch (state_)
{
case kDisconnected:
return "kDisconnected";
case kConnecting:
return "kConnecting";
case kConnected:
return "kConnected";
case kDisconnecting:
return "kDisconnecting";
default:
return "unknown state";
}
}
void TcpConnection::setTcpNoDelay(bool on)
{
socket_->setTcpNoDelay(on);
}
void TcpConnection::connectEstablished()//连接建立。在TcpServer中建立连接后会调用次函数
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();//等待数据到来
connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();//将channel_移除
}
void TcpConnection::handleRead(Timestamp receiveTime)//数据到来
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//这个回调函数为用户设置
}
else if (n == 0)//读到数据为零,则关闭连接
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
void TcpConnection::handleWrite()//发送数据
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)//如果已经发送完毕
{
channel_->disableWriting();//disable可写
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)//表示之前已经关闭过sockfd,但是正在发送数据,没有关闭。当发送完毕后再关闭
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don‘t close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);//设置状态
channel_->disableAll();//此后不再收发数据
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);//默认调用defaultConnectionCallback
// must be the last line
closeCallback_(guardThis);//调用TcpServer::removeConnection。在TcpServer::removeConnection调用TcpConnection::connectDestroyed
}
void TcpConnection::handleError()
{
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_
<< "] - SO_ERROR = " << err << " " << strerror_tl(err);
}
版权声明:本文为博主原创文章,未经博主允许不得转载。