SRS学习笔记10-SrsConnection及其子类分析

SrsConnection类代表一个client的连接,其中封装了st thread,用于在一个单独的st thread里处理一个client的服务请求.

SrsConnection在 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)里创建

SrsConnection* conn = NULL;
    if (type == SrsListenerRtmpStream) {
        conn = new SrsRtmpConn(this, client_stfd);
    } else if (type == SrsListenerHttpApi) {
#ifdef SRS_AUTO_HTTP_API
        conn = new SrsHttpApi(this, client_stfd, http_api_mux);
#else
        srs_warn("close http client for server not support http-api");
        srs_close_stfd(client_stfd);
        return ret;
#endif
    } else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVER
        conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);
#else
        srs_warn("close http client for server not support http-server");
        srs_close_stfd(client_stfd);
        return ret;
#endif
    } else {
        // TODO: FIXME: handler others
    }
    srs_assert(conn);

创建完成后调用 conn->start()启动,此后这个client的请求都在这个st thread 里完成

SrsConection类

/**
 * the manager for connection. * SrsServer类实现了这一接口,用来对SrsConnection类进行管理.
 */
class IConnectionManager
{
public:
    IConnectionManager();
    virtual ~IConnectionManager();
public:
    /**
     * remove the specified connection.
     */
    virtual void remove(SrsConnection* c) = 0;
};

/**
* the basic connection of SRS,
* all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove.
*/
class SrsConnection : public virtual ISrsOneCycleThreadHandler, public virtual IKbpsDelta
{
private:
    /**
    * each connection start a green thread,
    * when thread stop, the connection will be delete by server.  * 业务处理线程
    */
    SrsOneCycleThread* pthread;
    /**
    * the id of connection.
    */
    int id;
protected:
    /**
    * the manager object to manage the connection.
    */
    IConnectionManager* manager;
    /**
    * the underlayer st fd handler.
    */
    st_netfd_t stfd;
    /**
    * the ip of client.
    */
    std::string ip;
    /**
     * whether the connection is disposed,
     * when disposed, connection should stop cycle and cleanup itself.
     */
    bool disposed;
    /**
     * whether connection is expired, application definition.
     * when expired, the connection must never be served and quit ASAP.
     */
    bool expired;
public:
    SrsConnection(IConnectionManager* cm, st_netfd_t c);
    virtual ~SrsConnection();
public:
    /**
     * to dipose the connection.
     */
    virtual void dispose();
    /**
    * start the client green thread.
    * when server get a client from listener,
    * 1. server will create an concrete connection(for instance, RTMP connection),
    * 2. then add connection to its connection manager,
    * 3. start the client thread by invoke this start()
    * when client cycle thread stop, invoke the on_thread_stop(), which will use server
    * to remove the client by server->remove(this).
    */
    virtual int start();
// interface ISrsOneCycleThreadHandler
public:
    /**
    * the thread cycle function,
    * when serve connection completed, terminate the loop which will terminate the thread,
    * thread will invoke the on_thread_stop() when it terminated.
    */
    virtual int cycle();
    /**
    * when the thread cycle finished, thread will invoke the on_thread_stop(),
    * which will remove self from server, server will remove the connection from manager
    * then delete the connection.
    */
    virtual void on_thread_stop();
public:
    /**
    * get the srs id which identify the client.
    */
    virtual int srs_id();
    /**
     * set connection to expired.
     */
    virtual void expire();
protected:
    /**
    * for concrete connection to do the cycle.  * 具体connection 的处理函数,参见SrsRtmpConn.
    */
    virtual int do_cycle() = 0;
};
int SrsConnection::start()
{  // 启动SrsOneCycleThread类
    return pthread->start();
}

int SrsConnection::cycle()
{
    int ret = ERROR_SUCCESS;
    //这是个全局的id生成器,也用来生成st thread的id
    _srs_context->generate_id();
    id = _srs_context->get_id();
    // 取得客户端的ip地址
    ip = srs_get_peer_ip(st_netfd_fileno(stfd));
    // 调用子类如SrsRtmpConn的处理函数
    ret = do_cycle();

    // if socket io error, set to closed.
    if (srs_is_client_gracefully_close(ret)) {
        ret = ERROR_SOCKET_CLOSED;
    }

    // success.
    if (ret == ERROR_SUCCESS) {
        srs_trace("client finished.");
    }

    // client close peer.
    if (ret == ERROR_SOCKET_CLOSED) {
        srs_warn("client disconnect peer. ret=%d", ret);
    }

    return ERROR_SUCCESS;
}

void SrsConnection::on_thread_stop()
{
    // TODO: FIXME: never remove itself, use isolate thread to do cleanup.  // 用在SrsServer类的conns向量数组中删除此对象   manager->remove(this);
}

int SrsConnection::srs_id()
{
    return id;
}

void SrsConnection::expire()
{
    expired = true;
}

SrsRtmpConn类

SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
    : SrsConnection(svr, c)
{
    server = svr;
    req = new SrsRequest();
    res = new SrsResponse();
    skt = new SrsStSocket(c);//实现ISrsProtocolReaderWriter接口,即st socket的读写函数,以及读写超时,发送和接受到的字节数
    rtmp = new SrsRtmpServer(skt);// 提供rtmp-command-protocol服务,是一个上层的面向协议,media stream的服务,// 例如连接vhost/app,播放流,获取视频/音频数据,与之对应的是 SrsRtmpClient.
    refer = new SrsRefer();
    bandwidth = new SrsBandwidth();
    security = new SrsSecurity();
    duration = 0;
    kbps = new SrsKbps();
    kbps->set_io(skt, skt);
    wakable = NULL;

    mw_sleep = SRS_PERF_MW_SLEEP;
    mw_enabled = false;
    realtime = SRS_PERF_MIN_LATENCY_ENABLED;
    send_min_interval = 0;
    tcp_nodelay = false;
    client_type = SrsRtmpConnUnknown;
    // 在配置改变时,支持reload
    _srs_config->subscribe(this);
}

构造函数初始化成员变量

// TODO: return detail message when error for client.
int SrsRtmpConn::do_cycle()
{
    int ret = ERROR_SUCCESS;

    srs_trace("RTMP client ip=%s", ip.c_str());
// 设置SrsRtmpServer里面的底层数据通讯类SrsProtocol的读写超时
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
    // 握手
    if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
        srs_error("rtmp handshake failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp handshake success");
    // 获取连接的app
    if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
        srs_error("rtmp connect vhost/app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp connect app success");

    // set client ip to request.
    req->ip = ip;
    // 假设推流地址 rtmp://192.168.151.151/live/marstv
    // discovery vhost, resolve the vhost from config  req->vhost 是192.168.151.151
    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);// 没有设置 192.168.151.151 这样的vhost parsed_vhost是默认的vhost
    if (parsed_vhost) {
        req->vhost = parsed_vhost->arg0();// req->vhost 是 __defaultVhost__
    }

    srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());

    if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
        ret = ERROR_RTMP_REQ_TCURL;
        srs_error("discovery tcUrl failed. "
            "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
            req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
        return ret;
    }

    // check vhost

// 在 check_vhost里面会调用 http_hooks_on_connect
    if ((ret = check_vhost()) != ERROR_SUCCESS) {
        srs_error("check vhost failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("check vhost success.");

    srs_trace("connect app, "
        "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
        req->app.c_str(), (req->args? "(obj)":"null"));

    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;

        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }

        srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d",
            srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }

    ret = service_cycle();

    http_hooks_on_close();

    return ret;
}
时间: 2024-11-07 15:04:17

SRS学习笔记10-SrsConnection及其子类分析的相关文章

Java集合源码学习笔记(二)ArrayList分析

Java集合源码学习笔记(二)ArrayList分析 >>关于ArrayList ArrayList直接继承AbstractList,实现了List. RandomAccess.Cloneable.Serializable接口,为什么叫"ArrayList",因为ArrayList内部是用一个数组存储元素值,相当于一个可变大小的数组,也就是动态数组. (1)继承和实现继承了AbstractList,实现了List:ArrayList是一个数组队列,提供了相关的添加.删除.修

python基础教程_学习笔记10:异常

异常 什么是异常 Python用异常对象来表示异常情况.遇到错误后,会引发异常.如果异常对象并未被处理或捕捉,程序就会用所谓的回溯(Traceback,一种错误信息)终止执行: >>> 1/0 Traceback (most recent call last): File "<pyshell#0>", line 1, in <module> 1/0 ZeroDivisionError: integer division or modulo by

mybatis学习笔记(9)-订单商品数据模型分析

mybatis学习笔记(9)-订单商品数据模型分析 mybatis学习笔记9-订单商品数据模型分析 数据模型分析思路 数据模型分析 订单商品数据模型建表sql 本文对接下来几篇博客中用到的数据模型进行分析,并附上建表sql文件和测试数据文件 数据模型分析思路 每张表记录的数据内容 分模块对每张表记录的内容进行熟悉,相当于你学习系统需求(功能)的过程. 每张表重要的字段设置 非空字段.外键字段 数据库级别表与表之间的关系 外键关系 表与表之间的业务关系 在分析表与表之间的业务关系时一定要建立在某个

memcached学习笔记——存储命令源码分析下篇

上一篇回顾:<memcached学习笔记——存储命令源码分析上篇>通过分析memcached的存储命令源码的过程,了解了memcached如何解析文本命令和mencached的内存管理机制. 本文是延续上一篇,继续分析存储命令的源码.接上一篇内存分配成功后,本文主要讲解:1.memcached存储方式:2.add和set命令的区别. memcached存储方式 哈希表(HashTable) 哈希表在实践中使用的非常广泛,例如编译器通常会维护的一个符号表来保存标记,很多高级语言中也显式的支持哈希

Cocos2d-x学习笔记(五)CCLayer分析及输入事件处理(触摸、重力传感器、按键)

原创文章,转载请注明出处:http://blog.csdn.net/sfh366958228/article/details/38733415 简介 上一讲我们简单的介绍了CCScene,这一讲我们继续来看另一个核心组件CCLayer,他和CCScene有些类似,都是用来收纳其他节点,但是按照层次来说的话,CCLayer应该包含在CCScene之中.老规矩,我们从代码看起. 源码分析 class CC_DLL CCLayer : public CCNode, public CCTouchDele

Hadoop学习笔记(10) ——搭建源码学习环境

Hadoop学习笔记(10) ——搭建源码学习环境 上一章中,我们对整个hadoop的目录及源码目录有了一个初步的了解,接下来计划深入学习一下这头神象作品了.但是看代码用什么,难不成gedit?,单步调试呢? 看程序不能调那多痛苦啊,想看跟踪一下变量,想看一下执行路径都难. 所以这里,我们得把这个调试环境搭建起来.Hadoop的主要代码是用java编写的,所以这里就选用eclipse作为环境. Hadoop目录下,本身就可以为作eclipse的一个工程来操作,但这里我不想,我想自己来建一个工程,

memcached学习笔记——存储命令源码分析上

原创文章,转载请标明,谢谢. 上一篇分析过memcached的连接模型,了解memcached是如何高效处理客户端连接,这一篇分析memcached源码中的process_update_command函数,探究memcached客户端的set命令,解读memcached是如何解析客户端文本命令,剖析memcached的内存管理,LRU算法是如何工作等等. 解析客户端文本命令 客户端向memcached server发出set操作,memcached server读取客户端的命令,客户端的连接状态

jQuery学习笔记10:Ajax技术

jQuery 库拥有完整的 Ajax 兼容套件.其中的函数和方法允许我们在不刷新浏览器的情况下从服务器加载数据. jQuery 采用了三层封装:最底层的封装方法为:$.ajax(),而通过这层封装了第二层有三种方法:.load().$.get()和$.post(),最高层是$.getScript()和$.getJSON()方法. 函数 描述 jQuery.ajax() 执行异步 HTTP (Ajax) 请求. .ajaxComplete() 当 Ajax 请求完成时注册要调用的处理程序.这是一个

Android:日常学习笔记(10)———使用LitePal操作数据库

Android:日常学习笔记(10)---使用LitePal操作数据库 引入LitePal 什么是LitePal LitePal是一款开源的Android数据库框架,采用了对象关系映射(ORM)的模式,将平时开发时最常用的一些数据库功能进行了封装,使得开发者不用编写一行SQL语句就可以完成各种建表.増删改查的操作.并且LitePal很"轻",jar包大小不到100k,而且近乎零配置,这一点和Hibernate这类的框架有很大区别.目前LitePal的源码已经托管到了GitHub上. 关

sqlite学习笔记10:C语言中使用sqlite之查询和更新数据

前面说到的 sqlite_exec() 中的第三个参数, SQLite 将为 sql 参数内执行的每个 SELECT 语句中处理的每个记录调用这个回调函数. 本节添加了两个函数,selectFromTable和updateTable. 实例程序如下: #include <stdio.h> #include <stdlib.h> #include "sqlite/sqlite3.h" #define DB_NANE "sqlite/test.db&quo