muduo::TcpConnection分析

TcpConnection是使用shared_ptr来管理的类,因为它的生命周期模糊。TcpConnection表示已经建立或正在建立的连接,状态只有kConnecting、kConnected、kDisconnected、kDisconnecting,它初始化时,构造函数的sockfd表示正在建立连接kConnecting。

建立连接后,用户只需处理收发数据,发送数据它会自动处理,收取数据后,会调用用户设置的MessageCallback函数来处理收到的数据。

TcpConnection中封装了inputBuffer和outputBuffer,用来表示应用层的缓冲。在发送数据时,如果不能一次将呕吐Buffer中的数据发送完毕,它还会enable channel中的wirte事件,当sockfd可写时,会再次发送。

高水位回调和低水位回调:

在发送数据时,如果发送过快会造成数据在本地积累。muduo解决这个问题的办法是用了高水位回调和低水位回调,分别用函数HighWaterMarkCallback和WriteCompleteCallback代表。原理为:设置一个发送缓冲区的上限值,如果大于这个上限值,停止接收数据;WriteCompleteCallback函数为发送缓冲区为空时调用,在这个函数重启开启接收数据。

调用send时,可能不是TcpConnection所属的IO线程,这是通过loop_->runInLoop可以轮转到其所属的IO线程。因为TcpConnection中保存了其所属EventLoop的指针,可以通过EventLoop::runInLoop将所调用的函数添加到所属EventLoop的任务队列中。

断开连接:

TcpConnection的断开是采用被动方式,即对方先关闭连接,本地read(2)返回0后,调用顺序如下:

handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。

当建立连接后,TcpServer中的map持有TcpConnection的shared_ptr指针,因此TcpConnection在被移除map前不会析构,其shared_ptr计数器不小于1。在TcpConnection内部使用其shared_ptr时会调用shared_from_this()来获取。当map移除shared_ptr指针后,如果用户不持有TcpConnection的shared_ptr指针,那么在调用TcpConnection::connectDestroyed()后,TcpConnection会自动销毁。

setTcpNoDelay表示使用/不使用Nagle算法。禁用Nagle算法可以避免连续发包出现延迟,适用于低延迟的网络服务。

TcpConnection.h

class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
{
 public:
  /// Constructs a TcpConnection with a connected sockfd
  ///
  /// User should not create this object.
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();

  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() const { return localAddr_; }
  const InetAddress& peerAddress() const { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
  bool disconnected() const { return state_ == kDisconnected; }
  // return true if success.
  bool getTcpInfo(struct tcp_info*) const;
  string getTcpInfoString() const;

  // void send(string&& message); // C++11
  void send(const void* message, int len);
  void send(const StringPiece& message);
  // void send(Buffer&& message); // C++11
  void send(Buffer* message);  // this one will swap data
  void shutdown(); // NOT thread safe, no simultaneous calling
  // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
  void forceClose();
  void forceCloseWithDelay(double seconds);
  void setTcpNoDelay(bool on);//是否使用Nagle算法

  void setContext(const boost::any& context)
  { context_ = context; }

  const boost::any& getContext() const
  { return context_; }

  boost::any* getMutableContext()
  { return &context_; }

  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  void setWriteCompleteCallback(const WriteCompleteCallback& cb)//writeCompleteCallback_和highWaterMarckCallback_对应,解决发送过快问题
  { writeCompleteCallback_ = cb; }

  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

  /// Advanced interface
  Buffer* inputBuffer()
  { return &inputBuffer_; }

  Buffer* outputBuffer()
  { return &outputBuffer_; }

  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }

  // called when TcpServer accepts a new connection
  void connectEstablished();   // should be called only once
  // called when TcpServer has removed me from its map
  void connectDestroyed();  // should be called only once

 private:
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);
  void handleWrite();
  void handleClose();
  void handleError();
  // void sendInLoop(string&& message);
  void sendInLoop(const StringPiece& message);
  void sendInLoop(const void* message, size_t len);
  void shutdownInLoop();
  // void shutdownAndForceCloseInLoop(double seconds);
  void forceCloseInLoop();
  void setState(StateE s) { state_ = s; }
  const char* stateToString() const;

  EventLoop* loop_;
  const string name_;
  StateE state_;  // FIXME: use atomic variable
  // we don‘t expose those classes to client.
  boost::scoped_ptr<Socket> socket_;//封装的socket
  boost::scoped_ptr<Channel> channel_;//封装的channel
  const InetAddress localAddr_;
  const InetAddress peerAddr_;//远程端的信息
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;//用户自定义的,消息到达时怎么处理消息
  WriteCompleteCallback writeCompleteCallback_;
  HighWaterMarkCallback highWaterMarkCallback_;
  CloseCallback closeCallback_;
  size_t highWaterMark_;//发送缓冲区数据“上限阀值”,超过这个值
  Buffer inputBuffer_;//输入Buffer
  Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
  boost::any context_;
  // FIXME: creationTime_, lastReceiveTime_
  //        bytesReceived_, bytesSent_
};

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

TcpConnection.cc

//默认的几个回调函数
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
  LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
  // do not call conn->forceClose(), because some users want to register message callback only.
}

void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
  buf->retrieveAll();
}

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,//这里的fd没有封装
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      boost::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      boost::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);//使用
}

TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd()
            << " state=" << stateToString();
  assert(state_ == kDisconnected);
}

bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const//获取当前连接状态
{
  return socket_->getTcpInfo(tcpi);
}

string TcpConnection::getTcpInfoString() const
{
  char buf[1024];
  buf[0] = ‘\0‘;
  socket_->getTcpInfoString(buf, sizeof buf);
  return buf;
}

void TcpConnection::send(const void* data, int len)
{
  send(StringPiece(static_cast<const char*>(data), len));
}

void TcpConnection::send(const StringPiece& message)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())//如果是其OwnerIO线程,则发送
    {
      sendInLoop(message);
    }
    else//否则轮转到其Owner IO线程
    {
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop,
                      this,     // FIXME
                      message.as_string()));
                    //std::forward<string>(message)));
    }
  }
}

// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(buf->peek(), buf->readableBytes());
      buf->retrieveAll();
    }
    else
    {
      loop_->runInLoop(
          boost::bind(&TcpConnection::sendInLoop,
                      this,     // FIXME
                      buf->retrieveAllAsString()));
                    //std::forward<string>(message)));
    }
  }
}

void TcpConnection::sendInLoop(const StringPiece& message)
{
  sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool faultError = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)//Buffer中没数据,可以直接发送
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)//将缓存发送完毕,调用回调函数
      {
        loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
        {
          faultError = true;
        }
      }
    }
  }

  assert(remaining <= len);
  if (!faultError && remaining > 0)//还有数据可以发送
  {
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)//缓冲区数据太多,
    {
      loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);//将数据放到outBuffer中
    if (!channel_->isWriting())
    {
      channel_->enableWriting();//Buffer中有数据,开始writeable事件
    }
  }
}

void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())//正在发送时不关闭
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}

// void TcpConnection::shutdownAndForceCloseAfter(double seconds)
// {
//   // FIXME: use compare and swap
//   if (state_ == kConnected)
//   {
//     setState(kDisconnecting);
//     loop_->runInLoop(boost::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
//   }
// }

// void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
// {
//   loop_->assertInLoopThread();
//   if (!channel_->isWriting())
//   {
//     // we are not writing
//     socket_->shutdownWrite();
//   }
//   loop_->runAfter(
//       seconds,
//       makeWeakCallback(shared_from_this(),
//                        &TcpConnection::forceCloseInLoop));
// }

void TcpConnection::forceClose()//关闭Connection
{
  // FIXME: use compare and swap
  if (state_ == kConnected || state_ == kDisconnecting)
  {
    setState(kDisconnecting);
    loop_->queueInLoop(boost::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
  }
}

void TcpConnection::forceCloseWithDelay(double seconds)//延迟关闭,使用了定时器
{
  if (state_ == kConnected || state_ == kDisconnecting)
  {
    setState(kDisconnecting);
    loop_->runAfter(
        seconds,
        makeWeakCallback(shared_from_this(),
                         &TcpConnection::forceClose));  // not forceCloseInLoop to avoid race condition
  }
}

void TcpConnection::forceCloseInLoop()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected || state_ == kDisconnecting)
  {
    // as if we received 0 byte in handleRead();
    handleClose();
  }
}

const char* TcpConnection::stateToString() const
{
  switch (state_)
  {
    case kDisconnected:
      return "kDisconnected";
    case kConnecting:
      return "kConnecting";
    case kConnected:
      return "kConnected";
    case kDisconnecting:
      return "kDisconnecting";
    default:
      return "unknown state";
  }
}

void TcpConnection::setTcpNoDelay(bool on)
{
  socket_->setTcpNoDelay(on);
}

void TcpConnection::connectEstablished()//连接建立。在TcpServer中建立连接后会调用次函数
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  channel_->tie(shared_from_this());
  channel_->enableReading();//等待数据到来

  connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
  }
  channel_->remove();//将channel_移除
}

void TcpConnection::handleRead(Timestamp receiveTime)//数据到来
{
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);//这个回调函数为用户设置
  }
  else if (n == 0)//读到数据为零,则关闭连接
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}

void TcpConnection::handleWrite()//发送数据
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)//如果已经发送完毕
      {
        channel_->disableWriting();//disable可写
        if (writeCompleteCallback_)
        {
          loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)//表示之前已经关闭过sockfd,但是正在发送数据,没有关闭。当发送完毕后再关闭
        {
          shutdownInLoop();
        }
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don‘t close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);//设置状态
  channel_->disableAll();//此后不再收发数据

  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);//默认调用defaultConnectionCallback
  // must be the last line
  closeCallback_(guardThis);//调用TcpServer::removeConnection。在TcpServer::removeConnection调用TcpConnection::connectDestroyed
}

void TcpConnection::handleError()
{
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-05 21:48:43

muduo::TcpConnection分析的相关文章

muduo源代码分析--Reactor模式在muduo中的使用

一. Reactor模式简单介绍 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完毕处理.而是恰恰相反.Reactor逆置了事件处理流程,应用程序须要提供对应的接口并注冊到Reactor上,假设对应的时间发生,Reactor将主动调用应用程序注冊的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Cha

muduo::ThreadPoll分析

线程池本质上是一个生产者消费者的模型.在线程池有一个存放现场的ptr_vector,相当于消费者;有一个存放任务的deque,相当于仓库.线程(消费者)去仓库取任务,然后执行;当有新程序员是生产者,当有新任务时,就把任务放到deque(仓库). 任务队列(仓库)是有边界的,所以在实现时需要有两个信号量,相当与BoundedBlockingQueue. 每个线程在第一次运行时都会调用一次回调函数threadInitCallback_,为线程执行做准备. 在线程池开始运行之前,要先设置任务队列的大小

muduo::EventLoop分析

EventLoop是整个Reactor的核心.其类图如下: one loop per thread意味着每个线程只能有一个EventLoop对象,用变量 __thread EventLoop* t_loopInThisThread = 0; 表示,在创建EventLoop对象时将t_loopInThisThread赋值,以后再创建时就可以检查这个变量,如果已经赋值就说明当前线程已经创建过EventLoop对象了.线程调用静态函数EventLoop::getEventLoopOfCurrentTh

muduo源代码分析--Reactor在模型muduo使用(两)

一. TcpServer分类: 管理所有的TCP客户连接,TcpServer对于用户直接使用,直接控制由用户生活. 用户只需要设置相应的回调函数(消息处理messageCallback)然后TcpServer::start()就可以. 主要数据成员: boost::scoped_ptr<Accepter> acceptor_; 用来接受连接 std::map<string,TcpConnectionPtr> connections_; 用来存储全部连接 connectonCallb

Muduo 设计与实现之一:Buffer 类的设计

[开源访谈]Muduo 作者陈硕访谈实录 http://www.oschina.net/question/28_61182 开源访谈是开源中国推出的一系列针对国内优秀开源软件作者的访谈,以文字的方式记录并传播.我们希望开源访谈能全面的展现国内开源软件.开源软件作者的现状,着实推动国内开源软件的应用与发展. [嘉宾简介] 陈硕 北京师范大学硕士,擅长 C++ 多线程网络编程和实时分布式系统架构.现任职于香港某跨国金融公司 IT 部门,从事实时外汇交易系统开发.编写了开源 C++ 网络库 muduo

muduo网络库预备知识点

TCP网络编程的三个半事件 非阻塞网络编程中应用层要使用缓冲区 发送方应用层为什么使用缓冲区 接收方应用层方为什么使用缓冲 如何设计使用缓冲区 什么是Reactor模式 non-blocking IO IO multiplexing muduo推荐的模式 线程池大小的阻抗匹配原则 Eventloop采用level trigger的原因 前面都在分析muduo/base中的源码,这些是辅助网络库的.在分析网络库前,先总结一下相关知识点. TCP网络编程要关注哪些问题?muduo网络库总结为三个半事

Muduo网络库源码分析(六)TcpConnection 的生存期管理

TcpConnection是使用shared_ptr来管理的类,因为它的生命周期模糊.TcpConnection表示已经建立或正在建立的连接,建立连接后,用户只需要在上层类如TcpServer中设置连接到来和消息到来的处理函数,继而回调TcpConnection中的 setConnectionCallback和setMessageCallback函数,实现对事件的处理.用户需要关心的事件是有限的,其他都由网络库负责. TcpConnection中封装了InputBuffer和OutputBuff

Muduo网络库源码分析(一) EventLoop事件循环(Poller和Channel)

从这一篇博文起,我们开始剖析Muduo网络库的源码,主要结合<Linux多线程服务端编程>和网上的一些学习资料! (一)TCP网络编程的本质:三个半事件 1. 连接的建立,包括服务端接受(accept) 新连接和客户端成功发起(connect) 连接.TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据. 2. 连接的断开,包括主动断开(close 或shutdown) 和被动断开(read(2) 返回0). 3. 消息到达,文件描述符可读.这是最为重要的一个事件,对它的处理方式决定

muduo源码分析--我对muduo的理解

分为几个模块 EventLoop.TcpServer.Acceptor.TcpConnection.Channel等 对于EventLoop来说: 他只关注里面的主驱动力,EventLoop中只关注poll,这类系统调用使得其成为Reactor模式,EventLoop中有属于这个loop的所有Channel,这个loop属于哪一个Server. 几个类存在的意义: 从应用层使用的角度来看,用户需要初始化一个EventLoop,然后初始化一个TcpServer(当然也可以自定义个TcpServer