Acceptor
类Acceptor用来listen、accept,并调用回调函数来处理新到的连接。Acceptor中封装了Socket fd,它是用RAII手法初始化。accept后,如果有新连接到来,会调用handleRead()函数,在这个函数接收连接。在handleRead()函数中每次接收一个新连接,之后调用处理新连接的回调函数(如果有);但一次性可能有多个连接,这里可以改进的一点是:循环accept,一直到没有新连接到来,或者每次accept时,尝试一次接收N个。
在handleRead()中,accept后并没有考虑新到的连接是否可用,例如当文件描述符耗尽时。这里可以做个改进,拿到accept后的connfd后,如果大于0,则非阻塞poll(2)一下,看看是否可以读写,正常情况下会是writable,表明connfd可用;如果poll(2)返回错误,那么直接关闭connfd。
Acceptor.h
class Acceptor : boost::noncopyable
{
public:
typedef boost::function<void (int sockfd,
const InetAddress&)> NewConnectionCallback;//accept后调用的函数对象
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
bool listenning() const { return listenning_; }
void listen();
private:
void handleRead();
EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;//是否正在监听状态
int idleFd_;
};
Acceptor.cc
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie()),//初始化创建sockt fd
acceptChannel_(loop, acceptSocket_.fd()),//初始化channel
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));//当fd可读时调用回调函数hanleRead
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll();//将其冲poller监听集合中移除,此时为kDeleted状态
acceptChannel_.remove();//将其从EventList events_中移除,此时为kNew状态
::close(idleFd_);
}
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);//这里时真正接收连接
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);//将新连接信息传送到回调函数中
}
else//没有回调函数则关闭client对应的fd
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can‘t" in libev‘s doc.
// By Marc Lehmann, author of livev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
TcpServer
客户不直接使用Acceptor,它封装在TcpServer中。TcpServer使用比较简单,直接设置好新连接到达和消息到达的回调函数,之后start即可。
TcpServer中还封装了EventLoopThreadPool,因此TcpServer中的EventLoop对象为main Reactor,EventLoopThreadPool为sub Reactor。
当新建连接到达后,TcpServer创建一个新的TcpConnection对象来保存这个连接,设置这个新连接的回调函数,之后在EventLoopThreadPool中取一个EventLoop对象来作为这个新连接的reactor。
TcpServer用map保存了当前server对象中的TcpConnection,当TcpServer对象析构时,就会关闭所有连接。
TcpServer.h
class TcpServer : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
enum Option
{
kNoReusePort,
kReusePort,
};
//TcpServer(EventLoop* loop, const InetAddress& listenAddr);
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
~TcpServer(); // force out-line dtor, for scoped_ptr members.
const string& hostport() const { return hostport_; }
const string& name() const { return name_; }
EventLoop* getLoop() const { return loop_; }
/// Set the number of threads for handling input.
///
/// Always accepts new connection in loop‘s thread.
/// Must be called before @c start
/// @param numThreads
/// - 0 means all I/O in loop‘s thread, no thread will created.
/// this is the default value.
/// - 1 means all I/O in another thread.
/// - N means a thread pool with N threads, new connections
/// are assigned on a round-robin basis.
void setThreadNum(int numThreads);
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
/// valid after calling start()
boost::shared_ptr<EventLoopThreadPool> threadPool()
{ return threadPool_; }
/// Starts the server if it‘s not listenning.
///
/// It‘s harmless to call it multiple times.
/// Thread safe.
void start();
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
private:
/// Not thread safe, but in loop
void newConnection(int sockfd, const InetAddress& peerAddr);
/// Thread safe.
void removeConnection(const TcpConnectionPtr& conn);//TcpConnectionPtr为shared_ptr<TcpConnection>
/// Not thread safe, but in loop
void removeConnectionInLoop(const TcpConnectionPtr& conn);
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // the acceptor loop
const string hostport_;
const string name_;
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
boost::shared_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_;//新连接到达时的回调函数
MessageCallback messageCallback_;//消息到达时的回调函数
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
AtomicInt32 started_;
// always in loop thread
int nextConnId_;//用来计算标记Connection的名字
ConnectionMap connections_;//Map的key为connection的name
};
TcpServer.cc
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(//新连接到来时,调用的时TcpServer::newConnection函数
boost::bind(&TcpServer::newConnection, this, _1, _2));
}
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (ConnectionMap::iterator it(connections_.begin());//在析构函数中销毁connection
it != connections_.end(); ++it)
{
TcpConnectionPtr conn = it->second;
it->second.reset();
conn->getLoop()->runInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
conn.reset();
}
}
void TcpServer::setThreadNum(int numThreads)//设置线程池大小
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);//启动线程池
assert(!acceptor_->listenning());
loop_->runInLoop(
boost::bind(&Acceptor::listen, get_pointer(acceptor_)));//执行accept的listen
}
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();//在线程池取一个EventLoop对象
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);//生成这个连接的名字
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);//设置新连接的回调函数
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//将新到来的连接加入到监听事件中
}
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());//根据connection的名字移除connection
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));
}
版权声明:本文为博主原创文章,未经博主允许不得转载。