muduo::Acceptor、TcpServer分析

Acceptor

类Acceptor用来listen、accept,并调用回调函数来处理新到的连接。Acceptor中封装了Socket fd,它是用RAII手法初始化。accept后,如果有新连接到来,会调用handleRead()函数,在这个函数接收连接。在handleRead()函数中每次接收一个新连接,之后调用处理新连接的回调函数(如果有);但一次性可能有多个连接,这里可以改进的一点是:循环accept,一直到没有新连接到来,或者每次accept时,尝试一次接收N个。

在handleRead()中,accept后并没有考虑新到的连接是否可用,例如当文件描述符耗尽时。这里可以做个改进,拿到accept后的connfd后,如果大于0,则非阻塞poll(2)一下,看看是否可以读写,正常情况下会是writable,表明connfd可用;如果poll(2)返回错误,那么直接关闭connfd。

Acceptor.h

class Acceptor : boost::noncopyable
{
 public:
  typedef boost::function<void (int sockfd,
                                const InetAddress&)> NewConnectionCallback;//accept后调用的函数对象

  Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
  ~Acceptor();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  bool listenning() const { return listenning_; }
  void listen();

 private:
  void handleRead();

  EventLoop* loop_;
  Socket acceptSocket_;
  Channel acceptChannel_;
  NewConnectionCallback newConnectionCallback_;
  bool listenning_;//是否正在监听状态
  int idleFd_;
};

Acceptor.cc

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  : loop_(loop),
    acceptSocket_(sockets::createNonblockingOrDie()),//初始化创建sockt fd
    acceptChannel_(loop, acceptSocket_.fd()),//初始化channel
    listenning_(false),
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);
  acceptSocket_.setReusePort(reuseport);
  acceptSocket_.bindAddress(listenAddr);
  acceptChannel_.setReadCallback(
      boost::bind(&Acceptor::handleRead, this));//当fd可读时调用回调函数hanleRead
}

Acceptor::~Acceptor()
{
  acceptChannel_.disableAll();//将其冲poller监听集合中移除,此时为kDeleted状态
  acceptChannel_.remove();//将其从EventList events_中移除,此时为kNew状态
  ::close(idleFd_);
}

void Acceptor::listen()
{
  loop_->assertInLoopThread();
  listenning_ = true;
  acceptSocket_.listen();
  acceptChannel_.enableReading();
}

void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr;
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);//这里时真正接收连接
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);//将新连接信息传送到回调函数中
    }
    else//没有回调函数则关闭client对应的fd
    {
      sockets::close(connfd);
    }
  }
  else
  {
    LOG_SYSERR << "in Acceptor::handleRead";
    // Read the section named "The special problem of
    // accept()ing when you can‘t" in libev‘s doc.
    // By Marc Lehmann, author of livev.
    if (errno == EMFILE)
    {
      ::close(idleFd_);
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }
}

TcpServer

客户不直接使用Acceptor,它封装在TcpServer中。TcpServer使用比较简单,直接设置好新连接到达和消息到达的回调函数,之后start即可。

TcpServer中还封装了EventLoopThreadPool,因此TcpServer中的EventLoop对象为main Reactor,EventLoopThreadPool为sub Reactor。

当新建连接到达后,TcpServer创建一个新的TcpConnection对象来保存这个连接,设置这个新连接的回调函数,之后在EventLoopThreadPool中取一个EventLoop对象来作为这个新连接的reactor。

TcpServer用map保存了当前server对象中的TcpConnection,当TcpServer对象析构时,就会关闭所有连接。

TcpServer.h

class TcpServer : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
  enum Option
  {
    kNoReusePort,
    kReusePort,
  };

  //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
  TcpServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg,
            Option option = kNoReusePort);
  ~TcpServer();  // force out-line dtor, for scoped_ptr members.

  const string& hostport() const { return hostport_; }
  const string& name() const { return name_; }
  EventLoop* getLoop() const { return loop_; }

  /// Set the number of threads for handling input.
  ///
  /// Always accepts new connection in loop‘s thread.
  /// Must be called before @c start
  /// @param numThreads
  /// - 0 means all I/O in loop‘s thread, no thread will created.
  ///   this is the default value.
  /// - 1 means all I/O in another thread.
  /// - N means a thread pool with N threads, new connections
  ///   are assigned on a round-robin basis.
  void setThreadNum(int numThreads);
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }
  /// valid after calling start()
  boost::shared_ptr<EventLoopThreadPool> threadPool()
  { return threadPool_; }

  /// Starts the server if it‘s not listenning.
  ///
  /// It‘s harmless to call it multiple times.
  /// Thread safe.
  void start();

  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

 private:
  /// Not thread safe, but in loop
  void newConnection(int sockfd, const InetAddress& peerAddr);
  /// Thread safe.
  void removeConnection(const TcpConnectionPtr& conn);//TcpConnectionPtr为shared_ptr<TcpConnection>
  /// Not thread safe, but in loop
  void removeConnectionInLoop(const TcpConnectionPtr& conn);

  typedef std::map<string, TcpConnectionPtr> ConnectionMap;

  EventLoop* loop_;  // the acceptor loop
  const string hostport_;
  const string name_;
  boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
  boost::shared_ptr<EventLoopThreadPool> threadPool_;
  ConnectionCallback connectionCallback_;//新连接到达时的回调函数
  MessageCallback messageCallback_;//消息到达时的回调函数
  WriteCompleteCallback writeCompleteCallback_;
  ThreadInitCallback threadInitCallback_;
  AtomicInt32 started_;
  // always in loop thread
  int nextConnId_;//用来计算标记Connection的名字
  ConnectionMap connections_;//Map的key为connection的name
};

TcpServer.cc

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    hostport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
  acceptor_->setNewConnectionCallback(//新连接到来时,调用的时TcpServer::newConnection函数
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (ConnectionMap::iterator it(connections_.begin());//在析构函数中销毁connection
      it != connections_.end(); ++it)
  {
    TcpConnectionPtr conn = it->second;
    it->second.reset();
    conn->getLoop()->runInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
    conn.reset();
  }
}

void TcpServer::setThreadNum(int numThreads)//设置线程池大小
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_);//启动线程池

    assert(!acceptor_->listenning());
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));//执行accept的listen
  }
}

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();//在线程池取一个EventLoop对象
  char buf[32];
  snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);//生成这个连接的名字
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  connections_[connName] = conn;
  conn->setConnectionCallback(connectionCallback_);//设置新连接的回调函数
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//将新到来的连接加入到监听事件中
}

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
  size_t n = connections_.erase(conn->name());//根据connection的名字移除connection
  (void)n;
  assert(n == 1);
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 17:39:49

muduo::Acceptor、TcpServer分析的相关文章

muduo源码分析--我对muduo的理解

分为几个模块 EventLoop.TcpServer.Acceptor.TcpConnection.Channel等 对于EventLoop来说: 他只关注里面的主驱动力,EventLoop中只关注poll,这类系统调用使得其成为Reactor模式,EventLoop中有属于这个loop的所有Channel,这个loop属于哪一个Server. 几个类存在的意义: 从应用层使用的角度来看,用户需要初始化一个EventLoop,然后初始化一个TcpServer(当然也可以自定义个TcpServer

muduo源码分析--Reactor模式的在muduo中的使用(二)

一. TcpServer类: 管理所有的TCP客户连接,TcpServer供用户直接使用,生命期由用户直接控制.用户只需设置好相应的回调函数(如消息处理messageCallback)然后TcpServer::start()即可. 主要数据成员: boost::scoped_ptr<Accepter> acceptor_; 用来接受连接 std::map<string,TcpConnectionPtr> connections_; 用来存储所有连接 connectonCallbac

muduo::thread类分析

在看源代码前,先学习一个关键字:__thread. 线程共享进程的数据,如果想要每个线程都有一份独立的数据,那么可以使用__thread关键字修饰数据. __thread只能用于修饰POD类型的数据,不能修饰class,因为它无法调用构造函数和析构函数.__thread可以修饰全局变量.函数内的静态变量,不能修饰函数内的局部变量或class的普通成员变量. 在muduo/base/thread.{h, cc}中实现了线程的封装.thread的封装和一个命名空间muduo::CurrentThre

muduo源码分析--Reactor模式在muduo中的使用

一. Reactor模式简介 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Chann

Muduo网络库源码分析(五)Acceptor和TcpServer类

首先,我们先提一下对Socket的封装(不复杂,所以简单说一下). Endian.h : 封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中). SocketsOps.h/ SocketsOps.cc :封装了socket相关系统调用. Socket.h/Socket.cc(Socket类): 用RAII方法封装socket file descriptor. InetAddress.h/InetAddress.cc(InetAddress类):网际地址socka

muduo整体介绍及Echo服务器流程分析

muduo是Ractor模式,整个核心是Reactor;EventLoop就充当了Reactor.下面就是muduo的简化类图结构: EventLoop是one thread per loop中的loop,每个线程只能有一个EventLoop的实体,它来负责IO和定时器事件的分派.它用eventfd来异步唤醒,不同与传统的用一对pipe.它用TimerQueue作为计时管理,Poller实现IO multiplexing. Poller是个抽象基类,它实现了pool/epoll的封装.派生类Po

Muduo网络库实战(二):实现服务器与客户端的连接

1. 方案的确定 1)基本需求 用户1000+, IO压力不大: 多个客户端打开网站,输入查询字符串strclient,发送给服务器=>服务器接收客户端发过来的数据并处理,将结果返回给客户端: 2)并发网络服务程序设计方案 详见:<Muduo_网络库使用手册>的1.6节-<详解Muduo多线程模型> @ muduo中TcpServer模式的选择:多线程模式 模式一:单线程,accept与TcpConnection用同一个线程做IO; 模式二:多线程,accept与EventL

Chapter 10-02

Please indicate the source: http://blog.csdn.net/gaoxiangnumber1 Welcome to my github: https://github.com/gaoxiangnumber1 10.4 工程项目中头文件的使用规则 10.4.1 头文件的害处 ?我认为头文件的害处主要体现在以下几方面: 1.传递性. 头文件可以再包含其他头文件.一个简单的#include 展开之后有两万多行代码,一方面造成编译缓慢:另一方面,任何一个头文件改动一点

Linux非阻塞IO(五)使用poll实现非阻塞的回射服务器客户端

前面几节我们讨论了非阻塞IO的基本概念.Buffer的设计以及非阻塞connect的实现,现在我们使用它们来完成客户端的编写. 我们在http://www.cnblogs.com/inevermore/p/4049165.html中提出过,客户端需要监听stdin.stdout和sockfd. 这里需要注意的是 只有缓冲区可写的时候,才去监听sockfd和stdin的读事件. 过去在阻塞IO中,我们总是监听sockfd的读事件,因为每当sockfd可读,我们就去调用用户的回调函数处理read事件