SRS学习笔记12-SRSSOUCE类分析3

int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{
    int ret = ERROR_SUCCESS;

    if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) {
        return ret;
    }

    return ret;
}

bool SrsPublishEdge::can_publish()
{
    return state != SrsEdgeStatePublish;
}

int SrsPublishEdge::on_client_publish()
{
    int ret = ERROR_SUCCESS;

    // error when not init state.
    if (state != SrsEdgeStateInit) {
        ret = ERROR_RTMP_EDGE_PUBLISH_STATE;
        srs_error("invalid state for client to publish stream on edge. "
            "state=%d, ret=%d", state, ret);
        return ret;
    }

    // @see https://github.com/ossrs/srs/issues/180
    // to avoid multiple publish the same stream on the same edge,
    // directly enter the publish stage.
    if (true) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStatePublish;
        srs_trace("edge change from %d to state %d (push).", pstate, state);
    }

    // start to forward stream to origin.
    ret = forwarder->start();

    // @see https://github.com/ossrs/srs/issues/180
    // when failed, revert to init
    if (ret != ERROR_SUCCESS) {
        SrsEdgeState pstate = state;
        state = SrsEdgeStateInit;
        srs_trace("edge revert from %d to state %d (push). ret=%d", pstate, state, ret);
    }

    return ret;
}
SrsPublishEdge::on_client_publish这个函数在encoder 向 edge 推流是会调用这个函数,用来启动farwarder,在 encoder 和 origin 之间转发 rtmp message.

这一篇分析 SrsRtmpConn::process_publish_message函数

对SrsSouce类的几个处理函数的代码

1. SrsSource::on_edge_proxy_publish

int SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg)
{
    return publish_edge->on_proxy_publish(msg);
}

如果是edge,直接调用SrsPublishEdge类的on_proxy_publish方法,将msg转发到origin

int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
{
    return forwarder->proxy(msg);
}

调用 SrsEdgeForwarder类的proxy函数

int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
{
    int ret = ERROR_SUCCESS;

    if ((ret = send_error_code) != ERROR_SUCCESS) {
        srs_error("publish edge proxy thread send error, ret=%d", ret);
        return ret;
    }

    // the msg is auto free by source,
    // so we just ignore, or copy then send it.
    if (msg->size <= 0
        || msg->header.is_set_chunk_size()
        || msg->header.is_window_ackledgement_size()
        || msg->header.is_ackledgement()
    ) {
        return ret;
    }

    SrsSharedPtrMessage copy;
    if ((ret = copy.create(msg)) != ERROR_SUCCESS) {
        srs_error("initialize the msg failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("initialize shared ptr msg success.");

    copy.stream_id = stream_id;
    if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
        srs_error("enqueue edge publish msg failed. ret=%d", ret);
    }

    return ret;
}
int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)函数用来将从 encoder接收的 rtmp msg 保存在 queue中
int SrsEdgeForwarder::start()
{
    int ret = ERROR_SUCCESS;

    send_error_code = ERROR_SUCCESS;

    std::string ep_server, ep_port;
    if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
        return ret;
    }
    srs_assert(client);

    client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

    SrsRequest* req = _req;

    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
    if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
        srs_error("connect with server failed. ret=%d", ret);
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }

    if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {
        srs_error("publish failed, stream=%s, stream_id=%d. ret=%d",
            req->stream.c_str(), stream_id, ret);
        return ret;
    }

    return pthread->start();
}

int SrsEdgeForwarder::start()函数

在encoder 向 edge推流是调用

作用是用 SrsRtmpClient类连接到 origin

SrsEdgeForwarder包含

SrsReusableThread2类

#define SYS_MAX_EDGE_SEND_MSGS 128
int SrsEdgeForwarder::cycle()
{
    int ret = ERROR_SUCCESS;

    client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);

    SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
    SrsAutoFree(SrsPithyPrint, pprint);

    SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);

    while (!pthread->interrupted()) {
        if (send_error_code != ERROR_SUCCESS) {
            st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
            continue;
        }

        // read from client.
        if (true) {
            SrsCommonMessage* msg = NULL;
            ret = client->recv_message(&msg);

            srs_verbose("edge loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
                srs_error("edge push get server control message failed. ret=%d", ret);
                send_error_code = ret;
                continue;
            }
            //为什么没见到msg的处理函数直接就释放了           srs_freep(msg);
        }

        // forward all messages.
        // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
        int count = 0;
        if ((ret = queue->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
            srs_error("get message to push to origin failed. ret=%d", ret);
            return ret;
        }

        pprint->elapse();

        // pithy print
        if (pprint->can_print()) {
            kbps->sample();
            srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH
                " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
                pprint->age(), count,
                kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
                kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
        }
        // count 是 queue中消息的个数,也就是encoder发送到edge的rtmp msg得个数// count<=0 没有包需要有 edge 转发到 origin       // ignore when no messages.
        if (count <= 0) {
            srs_verbose("no packets to push.");
            continue;
        }

        // sendout messages, all messages are freed by send_and_free_messages().
        if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) {
            srs_error("edge publish push message to server failed. ret=%d", ret);
            return ret;
        }
    }

    return ret;
}

SrsEdgeForwarder::cycle的主要功能

有 SrsRtmpClient类来完成

1. 调用SrsRtmpClient::recv_message从 origin接收消息

2.

queue->dump_packets(msgs.max, msgs.msgs, count)将从encoder接收到的rtmp msg 保存在类型为

SrsMessageArray的变量msgs中,

client->send_and_free_messages(msgs.msgs, count, stream_id)将rtmp msg 转发到originclient是 edge到 origin rtmp 连接,类型是SrsRtmpClient,主要用于在encoder和origin转发 rtmp msg

edge 是的推流rtmp msg就分析完了,没做过多处理转发到origin接下来分析origin的rtmp msg处理
int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
{
    int ret = ERROR_SUCCESS;

    if (!source->can_publish(is_edge)) {
        ret = ERROR_SYSTEM_STREAM_BUSY;
        srs_warn("stream %s is already publishing. ret=%d",
            req->get_stream_url().c_str(), ret);
        return ret;
    }

    // when edge, ignore the publish event, directly proxy it.
    if (is_edge) {//在这里启动一个SrsEdgeForwarder转发encoder的rtmp msg      if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
            srs_error("notice edge start publish stream failed. ret=%d", ret);
            return ret;
        }
    } else {// origin 会运行到此处,从SrsSource的on_publish开始分析       if ((ret = source->on_publish()) != ERROR_SUCCESS) {
            srs_error("notify publish failed. ret=%d", ret);
            return ret;
        }
    }

    return ret;
}
int SrsSource::on_publish()
{
    int ret = ERROR_SUCCESS;

    // update the request object.
    srs_assert(_req);

    _can_publish = false;

    // whatever, the publish thread is the source or edge source,
    // save its id to srouce id.// 更新_source_id,并通知所有的SrsConsumer(call SrsConsumer::update_source_id)
    on_source_id_changed(_srs_context->get_id());

    // reset the mix queue.
    mix_queue->clear();

    // detect the monotonically again.
    is_monotonically_increase = true;
    last_packet_time = 0;

    // create forwarders// 在vhost 里配置 forward         127.0.0.1:1936 127.0.0.1:1937;// 以后再具体分析  if ((ret = create_forwarders()) != ERROR_SUCCESS) {
        srs_error("create forwarders failed. ret=%d", ret);
        return ret;
    }
    // 用 ffmpeg对流进行转码// 配置 transcode --> engine --> output          rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine];// 以后具体分析   // TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_TRANSCODE
    if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
        srs_error("start encoder failed. ret=%d", ret);
        return ret;
    }
#endif

    // TODO: FIXME: use initialize to set req.// hls相关,以后具体分析
#ifdef SRS_AUTO_HLS
    if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
        srs_error("start hls failed. ret=%d", ret);
        return ret;
    }
#endif
    //DVR 相关,以后具体分析   // TODO: FIXME: use initialize to set req.
#ifdef SRS_AUTO_DVR
    if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
        srs_error("start dvr failed. ret=%d", ret);
        return ret;
    }
#endif
//HDS相关,以后具体分析ifdef SRS_AUTO_HDS
    if ((ret = hds->on_publish(_req)) != ERROR_SUCCESS) {
        srs_error("start hds failed. ret=%d", ret);
        return ret;
    }
#endif

    // notify the handler.
    srs_assert(handler);
    if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
        srs_error("handle on publish failed. ret=%d", ret);
        return ret;
    }
    SrsStatistic* stat = SrsStatistic::instance();
    stat->on_stream_publish(_req, _source_id);

    return ret;
}

主要分析

handler->on_publish(this, _req)

b srs_app_source.cpp:2203

推流后断下

bt command output above:

#0  SrsServer::on_publish (this=0x8d53e0, s=0x8fa0f0, r=0x8fcf00) at src/app/srs_app_server.cpp:1426
#1  0x000000000048e817 in SrsSource::on_publish (this=0x8fa0f0) at src/app/srs_app_source.cpp:2203
#2  0x0000000000481fb6 in SrsRtmpConn::acquire_publish (this=0x8f4ec0, source=0x8fa0f0, is_edge=false)
    at src/app/srs_app_rtmp_conn.cpp:980
#3  0x000000000048122e in SrsRtmpConn::publishing (this=0x8f4ec0, source=0x8fa0f0) at src/app/srs_app_rtmp_conn.cpp:847
#4  0x000000000047f759 in SrsRtmpConn::stream_service_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:547
#5  0x000000000047e9d8 in SrsRtmpConn::service_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:416
#6  0x000000000047d936 in SrsRtmpConn::do_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:211
#7  0x000000000047bc6d in SrsConnection::cycle (this=0x8f4f48) at src/app/srs_app_conn.cpp:89
#8  0x00000000004ad6a3 in SrsOneCycleThread::cycle (this=0x8f4f90) at src/app/srs_app_thread.cpp:372
#9  0x00000000004ace3b in internal::SrsThread::thread_cycle (this=0x8f4fb0) at src/app/srs_app_thread.cpp:207
#10 0x00000000004ad049 in internal::SrsThread::thread_fun (arg=0x8f4fb0) at src/app/srs_app_thread.cpp:245
#11 0x0000000000535371 in _st_thread_main () at sched.c:327
#12 0x0000000000535ae1 in st_thread_create (start=0x7ffff7fdbc20, arg=0x1f7fdbb60, joinable=0, stk_size=5480568) at sched.c:591
SrsServer::on_publish代码如下
int SrsServer::on_publish(SrsSource* s, SrsRequest* r)
{
    int ret = ERROR_SUCCESS;

#ifdef SRS_AUTO_HTTP_SERVER
    if ((ret = http_server->http_mount(s, r)) != ERROR_SUCCESS) {
        return ret;
    }
#endif

    return ret;
}

SrsHttpServer::http_mount

参见http://www.cnblogs.com/yan-shi-yi/p/6837359.html

 
时间: 2024-11-05 13:33:18

SRS学习笔记12-SRSSOUCE类分析3的相关文章

SRS学习笔记10-SrsConnection及其子类分析

SrsConnection类代表一个client的连接,其中封装了st thread,用于在一个单独的st thread里处理一个client的服务请求. SrsConnection在 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)里创建 SrsConnection* conn = NULL; if (type == SrsListenerRtmpStream) { conn = new Srs

Caliburn.Micro学习笔记(一)----引导类和命名匹配规则

Caliburn.Micro学习笔记(一)----引导类和命名匹配规则 用了几天时间看了一下开源框架Caliburn.Micro 这是他源码的地址http://caliburnmicro.codeplex.com/ 文档也写的很详细,自己在看它的文档和代码时写了一些demo和笔记,还有它实现的原理记录一下 学习Caliburn.Micro要有MEF和MVVM的基础 先说一下他的命名规则和引导类 以后我会把Caliburn.Micro的 Actions IResult,IHandle ICondu

python基础教程_学习笔记12:充电时刻——模块

充电时刻--模块 python的标准安装包括一组模块,称为标准库. 模块 >>> import math >>> math.sin(0) 0.0 模块是程序 任何python程序都可以作为模块导入. $ cat hello.py #!/usr/bin/python print "Hello,signjing!" $ ./hello.py Hello,signjing! 假设将python程序保存在/home/ggz2/magiccube/mysh/p

C++ Primer 学习笔记_45_STL实践与分析(19)--泛型算法的结构

STL实践与分析 --泛型算法的结构 引言: 正如全部的容器都建立在一致的设计模式上一样,算法也具有共同的设计基础. 算法最主要的性质是须要使用的迭代器种类.全部算法都指定了它的每一个迭代器形參可使用的迭代器类型.比方,假设形參必须为随机訪问迭代器则可提供vector或 deque类型的迭代器,或者提供指向数组的指针.而其它容器的迭代器不能用在这类算法上. C++还提供了另外两种算法模式:一种模式由算法所带的形參定义;还有一种模式则通过两种函数命名和重载的规范定义. 一.算法的形參模式 大多数的

python 学习笔记 12 -- 写一个脚本获取城市天气信息

最近在玩树莓派,前面写过一篇在树莓派上使用1602液晶显示屏,那么能够显示后最重要的就是显示什么的问题了.最容易想到的就是显示时间啊,CPU利用率啊,IP地址之类的.那么我觉得呢,如果能够显示当前时间.温度也是甚好的,作为一个桌面小时钟还是很精致的. 1. 目前有哪些工具 目前比较好用的应该是 weather-util, 之前我获取天气信息一般都是通过它. 使用起来也很简单: (1) Debian/Ubuntu 用户使用 sudo apt-get install weather-util 安装

Swift学习笔记(12)--数组和字典的复制

Swift中,数组Array和字典Dictionary是用结构来实现的,但是数组与字典和其它结构在进行赋值或者作为参数传递给函数的时候有一些不同. 并且数组和字典的这些操作,又与Foundation中的NSArray和NSDictionary不同,它们是用类来实现的. 注意:下面的小节将会介绍数组,字典,字符串等的复制操作.这些复制操作看起来都已经发生,但是Swift只会在确实需要复制的时候才会完整复制,从而达到最优的性能. 字典的赋值和复制操作 每次将一个字典Dictionary类型赋值给一个

angular学习笔记(九)-css类和样式3

再来看一个选择li列表的例子: 点击li中的任意项,被点击的li高亮显示: <!DOCTYPE html> <html ng-app> <head> <title>6.3css类和样式</title> <meta charset="utf-8"> <script src="../angular.js"></script> <script src="scri

angular学习笔记(九)-css类和样式2

在上一个例子中,元素的类名使用拼接的方法,这样,类名中就不得不带有true或false,并且不易维护,所以,angular使用ng-class属性来控制元素的类名: 我们来看一个小例子,点击error按钮,顶部提示错误框,点击warning按钮,顶部提示警告框. 错误框的类名是.err,警告框的类名是.warn: <!DOCTYPE html> <html ng-app> <head> <title>6.2css类和样式</title> <

C++ Primer Plus学习笔记之继承类的初始化顺序

C++ Primer Plus学习笔记之继承类的初始化顺序 基类的构造函数,析构函数和操作符函数operator=是不能被派生类继承的: 那么,当创建一个派生类对象时,怎样调用基类的构造函数对基类的数据进行初始化呢??? 答案是:构造函数执行时遵行先兄长(基类),再客人(对象成员),后自己(派生类)的顺序: 另一方面,执行析构函数时,先执行派生类的析构函数,再执行基类的析构函数.原因是,对基类的破坏隐含了对派生类的破坏,所以派生类的析构函数必须先执行: #include<iostream> u

Lua学习笔记4:类及集成的实现

-- Lua中类的实现 -------------------------------- 基类 ---------------------------- classBase = {x = 0,y = 0} -- x,y为类的成员变量 classBase.__index = classBase -- 这句是重定义元表的索引,必须要有 --模拟构造体,一般名称为new() function classBase:new(x,y) local self = {}     -- 初始化对象自身 setme