MIT 2012 分布式课程基础源码解析-底层通讯实现

本节内容和前节事件管理封装是息息相关的,本节内容主要包含的代码在connection{.h, .cc}中。

这里面最主要的有两个类:connection类和tcpsconn类,connetion类主要服务于单个套接字,包括套接字上的数据读取写入等,而tcpsconn类则是服务于套接字集合,如接收连接,更新失效套接字等。具体我们看头文件。

class chanmgr {
    public:
        virtual bool got_pdu(connection *c, char *b, int sz) = 0;
        virtual ~chanmgr() {}
};

我们首先看到的是这个虚基类类,这个类会以委托的形式用在connection和tcpsconn类中,它只有一个方法即got_pdu,它在RPC实现中扮演着重要角色,后面使用的时候会再次介绍它。

connection类

 1 class connection : public aio_callback {
 2     public:
 3         //内部buffer类,主要用于接收/写入数据的buffer
 4         struct charbuf {
 5             charbuf(): buf(NULL), sz(0), solong(0) {}
 6             charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
 7             char *buf;
 8             int sz;
 9             int solong; //amount of bytes written or read so far
10         };
11         //m1: chanmgr, f1: socket or file,
12         connection(chanmgr *m1, int f1, int lossytest=0);
13         ~connection();
14
15         int channo() { return fd_; }
16         bool isdead();
17         void closeconn();
18
19         bool send(char *b, int sz);
20         void write_cb(int s);
21         void read_cb(int s);
22         //增加/减少引用计数
23         void incref();
24         void decref();
25         int ref();
26
27         int compare(connection *another);
28     private:
29
30         bool readpdu();
31         bool writepdu();
32
33         chanmgr *mgr_;
34         const int fd_;
35         bool dead_;
36
37         charbuf wpdu_; //write pdu
38         charbuf rpdu_; //read pdu
39
40         struct timeval create_time_;
41
42         int waiters_;
43         int refno_;
44         const int lossy_;
45
46         pthread_mutex_t m_;
47         pthread_mutex_t ref_m_; //保护更新引用计数的安全性
48         pthread_cond_t send_complete_;
49         pthread_cond_t send_wait_;
50 };

这段代码即是connetion类的定义,它继承至aio_callback,在上一节说过,aio_callback在事件管理类中作为回调类,读取或写入数据,现在connection类就相当于一个回调类。

我们从connection的构造函数中便可以得知。

connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
{

    int flags = fcntl(fd_, F_GETFL, NULL);
    flags |= O_NONBLOCK;  //no blocking
    fcntl(fd_, F_SETFL, flags);
    //ignore信号
    signal(SIGPIPE, SIG_IGN);
    VERIFY(pthread_mutex_init(&m_,0)==0);
    VERIFY(pthread_mutex_init(&ref_m_,0)==0);
    VERIFY(pthread_cond_init(&send_wait_,0)==0);
    VERIFY(pthread_cond_init(&send_complete_,0)==0);

       VERIFY(gettimeofday(&create_time_, NULL) == 0);
       //事件管理类将本类作为回调类添加到相应的事件管理数组中
    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
}

那这个类的具体作用是啥呢?其实它就是用于在给定套接字上通信用的,对于发送数据,会发送直到数据发送完成为止,未发送完成则会将该事件添加到事件管理中,在下一轮事件循环中继续发送,这一点我们可以从send函数中看出:

bool
connection::send(char *b, int sz)
{
    ScopedLock ml(&m_);
    waiters_++;
    //当活着,且write pdu中还有数据时等待数据清空(发送完)
    while (!dead_ && wpdu_.buf) {
        VERIFY(pthread_cond_wait(&send_wait_, &m_)==0);
    }
    waiters_--;
    if (dead_) {
        return false;
    }
    wpdu_.buf = b;
    wpdu_.sz = sz;
    wpdu_.solong = 0;

    if (lossy_) {
        if ((random()%100) < lossy_) {
            jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
            shutdown(fd_,SHUT_RDWR);
        }
    }

    //发送失败时
    if (!writepdu()) {
        dead_ = true;
        VERIFY(pthread_mutex_unlock(&m_) == 0);
        PollMgr::Instance()->block_remove_fd(fd_);
        VERIFY(pthread_mutex_lock(&m_) == 0);
    }else{
        if (wpdu_.solong == wpdu_.sz) {
        }else{
            //should be rare to need to explicitly add write callback
            //这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb,
            //就像是一个递归
            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
            while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
                VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0);
            }
        }
    }
    //清空写buffer
    bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
    wpdu_.solong = wpdu_.sz = 0;
    wpdu_.buf = NULL;
    if (waiters_ > 0)
        pthread_cond_broadcast(&send_wait_);  //唤醒上面的等待
    return ret;
}

send

对于读取数据,则当rpdu_(read buffer)未满时继续读,读取完成后就是用chanmgr类的got_pdu处理读取后的数据。

注意发送数据/接收数据都会首先发送数据大小/接收数据大小,然后再做后续发送数据/接收数据的工作。

除了connection类的发送/接收数据的功能外,我们还看到一个私有变量refno_变量,该变量的作用是用于引用计数,引用计数是一种很常见的编程技巧,例如在python中,引用计数用于对象的管理,当引用计数为0时,对象便会销毁,这里的引用计数也是也是同样的道理,这一点可以从decref函数中得知

void
connection::decref()
{
    VERIFY(pthread_mutex_lock(&ref_m_)==0);
    refno_ --;
    VERIFY(refno_>=0);
    //当引用计数为0时,销毁对象
    if (refno_==0) {
        VERIFY(pthread_mutex_lock(&m_)==0);
        if (dead_) {
            VERIFY(pthread_mutex_unlock(&ref_m_)==0);
            VERIFY(pthread_mutex_unlock(&m_)==0);
            delete this;
            return;
        }
        VERIFY(pthread_mutex_unlock(&m_)==0);
    }
    pthread_mutex_unlock(&ref_m_);
}

tcpscon类:

这个类则是用于管理connection的,我们先看它的定义

/**
 *  管理客户连接,将连接放入一个map中map<int, connection*>
 *
 */
class tcpsconn {
    public:
        tcpsconn(chanmgr *m1, int port, int lossytest=0);
        ~tcpsconn();

        void accept_conn();
    private:

        pthread_mutex_t m_;
        pthread_t th_;
        int pipe_[2];

        int tcp_; //file desciptor for accepting connection
        chanmgr *mgr_;
        int lossy_;
        std::map<int, connection *> conns_;

        void process_accept();
};

可看到里面定义了一个map,该map的key其实是connection类指针对应的套接字,我们看构造函数实现

tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{

    VERIFY(pthread_mutex_init(&m_,NULL) == 0);

    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(port);

    tcp_ = socket(AF_INET, SOCK_STREAM, 0);
    if(tcp_ < 0){
        perror("tcpsconn::tcpsconn accept_loop socket:");
        VERIFY(0);
    }

    int yes = 1;
    //设置TCP参数, reuseaddr, nodelay
    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));

    if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
        perror("accept_loop tcp bind:");
        VERIFY(0);
    }

    if(listen(tcp_, 1000) < 0) {
        perror("tcpsconn::tcpsconn listen:");
        VERIFY(0);
    }

    jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port,
        sin.sin_port);

    if (pipe(pipe_) < 0) {
        perror("accept_loop pipe:");
        VERIFY(0);
    }

    int flags = fcntl(pipe_[0], F_GETFL, NULL);
    flags |= O_NONBLOCK;
    fcntl(pipe_[0], F_SETFL, flags);  //无阻塞管道

    VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0);
}

该构造函数主要是初始化服务器端连接,然后创建一个线程来等待客户端的连接,后面处理客户端连接时,会将连接的客户端套接字添加到conns_的map中,即创建套接字到connection指针的对应关系,然后遍历conns_,清除死亡的connection,从而达到及时处理死亡连接的效果。

时间: 2024-10-06 22:04:41

MIT 2012 分布式课程基础源码解析-底层通讯实现的相关文章

MIT 2012分布式课程基础源码解析一-源码概述

课程主页 课程介绍:本课程会在给出的源码的基础上要求完成8个lab lab overviewLab 1 - Lock ServerLab 2 - Basic File ServerLab 3 - MKDIR, UNLINK, and LockingLab 4 - Caching Lock ServerLab 5 - Caching Extent Server + ConsistencyLab 6 - PaxosLab 7 - Replicated lock serverLab 8 - Proje

MIT 2012分布式课程基础源码解析-事件管理封装

这部分的内容主要包括Epoll/select的封装,在封装好相应函数后,再使用一个类来管理相应事件,实现的文件为pollmgr.{h, cc}. 事件函数封装 可看到pollmgr.h文件下定一个了一个虚基类aio_mgr 1 class aio_mgr { 2 public: 3 virtual void watch_fd(int fd, poll_flag flag) = 0; 4 virtual bool unwatch_fd(int fd, poll_flag flag) = 0; 5

convnet源码解析(一):基础准备

Jeremy Lin ConvNet是一个基于GPU实现的卷积神经网络开源代码(C++11),是由多伦多大学的Geoffrey Hinton深度学习团队编写的,它的最初版本是Hinton的学生Alex Krizhevsky编写的cuda-convnet(其项目地址在google code上面),最近cuda-convnet也从1.0版本更新到2.0版本(地址). 这份开源代码的官方地址是:http://deeplearning.cs.toronto.edu/codes 在CNN的开源代码中最出名

设计模式课程 设计模式精讲 12-3 适配器模式源码解析

1 源码解析 1.1 源码解析1(在jdk中的应用) 1.2 源码解析2(Spring中的通知管理) 1.3 源码解析3(SpringMVC中的应用) 1 源码解析 1.1 源码解析1(在jdk中的应用) xmlAdapter(此类是用于适配xml的一个类,是处理xml序列化和反序列化的一个类) public abstract class XmlAdapter<ValueType,BoundType> { /** * Do-nothing constructor for the derived

设计模式课程 设计模式精讲 4-3 简单工厂源码解析

1 源码解析 1.1 Calendar源码解析 1.2 DriverManager源码解析 1 源码解析 1.1 Calendar源码解析 /** * Gets a calendar using the specified time zone and default locale. * The <code>Calendar</code> returned is based on the current time * in the given time zone with the d

设计模式课程 设计模式精讲 6-3 抽象工厂源码解析

1 源码解析 1.1 mysql源码解析 1.2 mybaties 的sqlsession源码解析 1 源码解析 1.1 mysql源码解析 1.2 mybaties 的sqlsession源码解析 原文地址:https://www.cnblogs.com/1446358788-qq/p/11295158.html

设计模式课程 设计模式精讲 8-11 单例模式源码解析(jdk+spring+mybaties)

1 源码解析 1.1 单例解析1 1.2 单例解析2(容器单例) 1.3 单例解析3 1.4 单例解析4 1 源码解析 1.1 单例解析1 java.lang.Runtime /** * 饿汉式加载,初始化的时候,就已经new出了对象 */ private static Runtime currentRuntime = new Runtime(); /** * Returns the runtime object associated with the current Java applicat

设计模式课程 设计模式精讲 10-2 外观模式源码解析

1 源码解析 1.1 源码解析1(jdk中的JDBCUtils工具类) 1.2 源码解析2 1.3 源码解析3 1.4 源码解析4 1 源码解析 1.1 源码解析1(jdk中的JDBCUtils工具类) jdbc在springJDBC中的封装 /** * Close the given JDBC Connection and ignore any thrown exception. * This is useful for typical finally blocks in manual JDB

设计模式课程 设计模式精讲 15-3 桥接模式源码解析

1 桥接模式源码解析 1.1 源码解析1 jdk中的应用(驱动类) 1 桥接模式源码解析 1.1 源码解析1 jdk中的应用(驱动类) 步骤: class.forName 调取驱动接口的静态块,触发驱动管理类DriverManager 的注册驱动方法,从而将该驱动放到CopyOnWriteArrayList中. getConnect方法是通过传入url用户名密码. 针对不同的数据库,通过driverManager中的不同方法,获取的都是相同的接口,jdbc在最初的时候设计了一套接口,再由各个数据