SRS之播放推流视频

1. 综述

首先,推流直播的配置文件如下:

# rtmp.conf

listen              1935;
max_connections     1000;
daemon              off;
srs_log_tank        console;
vhost __defaultVhost__ {
}

搭建的简陋直播步骤如下:

  1. 启动 srs:./obj/srs -c ./conf/rtmp.conf;
  2. 设置并开启 obs 推流,obs 的视频来源随便,可以直接是视频获取设备或本地文件,此外 obs 的设置如下图:

    注:必须填 "流名称",因为 SRS 不支持没有流名称的请求。

  3. 开启 obs 推流后,即可使用另一个客户端向 SRS 播放 obs 推流的视频,这里用 vlc 进行播放,vlc 播放流填:

    rtmp://192.168.56.101:1935/live/livestream

obs 推流过程分析可见如下链接:

  1. SRS 如何建立对服务器端口的监听,以便监听客户端的连接请求: SRS之监听端口的管理:RTMP
  2. SRS 的主线程循环休眠,以便调度其他线程运行,如监听端口的线程 tcp: SRS之SrsServer::cycle()
  3. SRS 监听端口的线程监听到有客户端的连接请求后,会 accept 客户端的连接,并创建一个 conn 线程,专用于处理与该客户端的请求: SRS之RTMP连接处理线程conn:接收客户端推流
  4. 在 conn 线程的循环开始时,首先服务器会与客户端进行 handshake 过程: SRS之RTMP handshake
  5. handshake 成功后,会接收客户端 handshake 后的第一个命令,一般为 connect(‘xxx‘): SRS之SrsRtmpServer::connect_app详解
  6. 接着进入 SrsRtmpConn::service_cycle 函数: SRS之SrsRtmpConn::service_cycle详解
  7. 在 SrsRtmpConn::service_cycle 函数开始向客户端发送 应答窗口大小(5)、设置流带宽(6)、回复客户端 connect 的响应等消息后,开始进入循环,此时,调用 stream_service_cycle 函数: SRS之SrsRtmpConn::stream_service_cycle详解
  8. 在 stream_service_cycle 中,首先会鉴别客户端要做的实际请求是什么,如 publish 或 play,若为 FMLE 类型的 publish,则会调用 start_fmle_publish 函数与客户端先进行一些列消息的交互,然后开始处理真正的推流: SRS之SrsRtmpConn::publishing详解
  9. 若客户端为 publish,则在 publishing 函数中,会创建一个专门用于接收客户端推流数据的线程 recv,该线程会将推流数据缓存到 consumer(若此时有客户端播放推流视频的话) 中: SRS之接收推流线程:recv

下面即开始在第 8 步的基础上分析:vlc 连接 SRS,请求播放 obs 推流的的视频(前面建立连接的过程和上面 1~7 差不多)。

下面的分析会简化一下源码。

2. SrsRtmpConn::stream_service_cycle

int SrsRtmpConn::stream_service_cycle()
{
    int ret = ERROR_SUCCESS;

    SrsRtmpConnType type;
    /* 首先,鉴别客户端连接的类型,这里应该为 play */
    if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration))
        != ERROR_SUCCESS) {
        ...
    }
    /* 对 url、host 这些数据进行简化,如去掉空格或其他不必要的字符 */
    req->strip();

    /* 若配置文件中没有配置 security,则忽略 */
    // sercurity check
    if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
        ...
    }

    /* SRS 不支持空的流名称,因为对于 HLS 可能会通过空的流名称写到一个文件中 */
    if (req->stream.empty()) {
        ...
    }

    /* 设置服务器 send/recv 的超时时间,这里都为 30*1000*1000LL */
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

    /* 找到一个 source 来为该客户端提供服务 */
    SrsSource* source = NULL;
    if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
        ...
    }

    /* update the statistic when source disconveried. */
    ...

    /* 若配置文件中没有配置 mode 的话,默认返回 false */
    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    /* 若配置文件中没有配置 gop_cache,则默认开始 gop_cache */
    bool enabled_cache = _srs_config->get_app_cache(req->vhost);
    source->set_cache(enabled_cache);

    /* 这里应为 SrsRtmpConnPlay */
    client_type = type;
    switch (type) {
        case SrsRtmpConnPlay: {
            /* response connection start play */
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                ...
            }
            /* 若配置文件中没有配置 http_hooks,则忽略该函数 */
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                ...
            }

            /* 这里开始向客户端发送 obs 推的流 */
            ret = playing(source);
            http_hooks_on_stop();

            return ret;
        }

        ...
    }

    return ret;
}

2.1 SrsRtmpServer::identify_client

该函数是通过接收一些客户端发来的消息来鉴别该客户端的请求的类型:publish or play。

/*
 * recv some mesage to identify the client.
 * @stream_id, client will createStream to play or publish by flash,
 *     the stream_id used to response the createStream request.
 * @type, output the client type.
 * @stream_name, output the client publish/play stream name. @see: SrsRequest.stream
 * @duration, output the play client duration. @see: SrsRequest.duration
 */
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type,
    string& stream_name, double& duration)
{
    type = SrsRtmpConnUnknown;
    int ret = ERROR_SUCCESS;

    while (true) {
        SrsCommonMessage* msg = NULL;
        /* 获取一个完整的消息 */
        if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
            ...
        }

        SrsAutoFree(SrsCommonMessage, msg);
        SrsMessageHeader& h = msg->header;

        if (h.is_ackledgement() || h.is_set_chunk_size() ||
            h.is_window_ackledgement_size() || h.is_user_control_message()) {
            continue;
        }

        SrsPacket* pkt = NULL;
        /* 解析该消息 */
        if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            ...
        }

        SrsAutoFree(SrsPacket, pkt);

        /* 接收到的消息为 createStream(‘livestream‘) */
        if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
            srs_info("identify client by create stream, play or flash publish.");
            return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt),
                   stream_id, type, stream_name, duration);
        }

        ...
    }

    return ret;
}

该函数检测接收到的消息为 createStream 后,接着调用 identify_create_stream_client 做进一步的处理。

2.1.1 SrsRtmpServer::identify_create_stream_client

int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id,
    SrsRtmpConnType& type, string& stream_name, double& duration)
{
    int ret = ERROR_SUCCESS;

    if (true) {
        /* 构造一个用于响应 createStream 消息的类 */
        SrsCreateStreamResPacket* pkt =
            new SrsCreateStreamResPacket(req->transaction_id, stream_id);
        if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            ...
        }
    }

    while (true) {
        SrsCommonMessage* msg = NULL;
        if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
            ...
        }

        SrsAutoFree(SrsCommonMessage, msg);
        SrsMessageHeader& h = msg->header;

        if (h.is_ackledgement() || h.is_set_chunk_size() ||
            h.is_window_ackledgement_size() || h.is_user_control_message()) {
            continue;
        }

        if (!h.is_amf0_command() && !h.is_amf3_command()) {
            srs_trace("identify ignore messages except "
                "AMF0/AMF3 command message. type=%#x", h.message_type);
            continue;
        }

        SrsPacket* pkt = NULL;
        if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            ...
        }

        /* 直到接收到 play 才返回 */
        SrsAutoFree(SrsPacket, pkt);
        if (dynamic_cast<SrsPlayPacket*>(pkt)) {
            srs_info("level1 identify client by play.");
            return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt),
                   type, stream_name, duration);
        }

        ...
    }

    return ret;
}

2.1.2 SrsRtmpServer::identify_play_client

int SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type,
    string& stream_name, double& duration)
{
    int ret = ERROR_SUCCESS;

    type = SrsRtmpConnPlay;
    /* 客户端请求播放的流名称,可知为 livestream */
    stream_name = req->stream_name;
    duration = req->duration;

    srs_info("identity client type=play, stream_name=%s, duration=%.2f",
             stream_name.c_str(), duration);

    return ret;
}

鉴别到客户端请求的行为为 play 后,接着为该请求获取一个 SrsSource 类的 source,用于为该客户端的请求提供服务。

2.2 SrsSource::fetch_or_create

/*
 * create source when fetch from cache failed.
 * @param r the client request.
 * @param h the event handler for source.
 * @param pps the matched source, if success never be NULL.
 */
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
    int ret = ERROR_SUCCESS;

    /* 先从全局变量 SrsSource::pool 中寻找是否存在该 stream_url 对应的 source */
    SrsSource* source = NULL;
    if ((source = fetch(r)) != NULL) {
        *pps = source;
        return ret;
    }

    /* 若不存在,下面则是新构建一个,并将该 source 放入到 pool 中 */

    /* 根据 vhost/app/stream 生成一个 stream_url */
    string stream_url = r->get_stream_url();
    string vhost = r->vhost;

    // should always not exists for create a source.
    srs_assert (pool.find(stream_url) == pool.end());

    /* 构造一个新的 source */
    source = new SrsSource();
    if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
        srs_freep(source);
        return ret;
    }

    /* 将该 source 以 stream_url 为索引值放入到 pool 中 */
    pool[stream_url] = source;
    srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());

    *pps = source;

    return ret;
}

2.2.2 构造 SrsSource

/* live streaming source. */
SrsSource::SrsSource()
{
    _req = NULL;
    jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
    mix_correct = false;
    mix_queue = new SrsMixQueue();

#ifdef SRS_AUTO_HLS
    hls = new SrsHls();
#endif
#ifdef SRS_AUTO_DVR
    dvr = new SrsDvr();
#endif
#ifdef SRS_AUTO_TRANSCODE
    encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
    hds = new SrsHds(this);
#endif

    /* cache_metadata: 缓存元数据
     * cache_sh_video: 缓存 sps,pps
     * cache_sh_audio: 缓存 audio sequence header */
    cache_metadata = cache_sh_video = cache_sh_audio = NULL;

    _can_publish = true;
    _pre_source_id = _source_id = -1;
    die_at = -1;

    play_edge = new SrsPlayEdge();
    publish_edge = new SrsPublishEdge();
    /* 默认开启 gop_cache */
    gop_cache = new SrsGopCache();
    aggregate_stream = new SrsStream();

    is_monotonically_increase = false;
    last_packet_time = 0;

    _srs_config->subscribe(this);
    atc = false;
}

接下来,开始响应客户端的 play 命名。

2.3 SrsRtmpServer::start_play

int SrsRtmpServer::start_play(int stream_id)
{
    int ret = ERROR_SUCCESS;

    // StreamBegin
    if (true) {
        SrsUserControlPacket* pkt = new SrsUserControlPacket();
        pkt->event_type = SrsPCUCStreamBegin;
        pkt->event_data = stream_id;
        /* 向客户端发送 Stream Begin 1 的用户控制消息 */
        if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            ...
        }
    }

    // onStatus(NetStream.Play.Reset)
    if (true) {
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();

        pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamReset));
        pkt->data->set(StatusDescription,
            SrsAmf0Any::str("Playing and resetting stream."));
        pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
        pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));

        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            ...
        }
    }

    // onStatus(NetStream.Play.Start)
    if (true) {
        SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();

        pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamStart));
        pkt->data->set(StatusDescription, SrsAmf0Any::str("Started playing stream."));
        pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
        pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));

        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            ...
        }
    }

    // |RtmpSampleAccess(false, false)
    if (true) {
        SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();

        // allow audio/video sample.
        // @see: https://github.com/ossrs/srs/issues/49
        pkt->audio_sample_access = true;
        pkt->video_sample_access = true;

        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            ...
        }
        srs_info("send |RtmpSampleAccess(false, false) message success.");
    }

    // onStatus(NetStream.Data.Start)
    if (true) {
        SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
        pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart));
        if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
            ...
        }
        srs_info("send onStatus(NetStream.Data.Start) message success.");
    }

    srs_info("start play success.");

    return ret;
}

该函数依次向客户端发送的消息如下几幅图。

send: StreamBegin

send: onStatus(NetStream.Play.Reset)

send: onStatus(NetStream.Play.Start)

send: |RtmpSampleAccess(false, false)

send: onStatus(NetStream.Data.Start)

3. SrsRtmpConn::playing

int SrsRtmpConn::playing(SrsSource* source)
{
    int ret = ERROR_SUCCESS;

    /* create consumer of source. */
    SrsConsumer* consumer = NULL;
    if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
        ...
    }
    SrsAutoFree(SrsConsumer, consumer);

    /* use isolate thread to recv,
     * @see: https://github.com/ossrs/srs/issues/217 */
    SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);

    /* start isolate recv thread */
    if ((ret = trd.start()) != ERROR_SUCCESS) {
        ...
    }

    /* delivery message for clients playing stream. */
    wakable = consumer;
    ret = do_playing(source, consumer, &trd);
    wakable = NULL;

    /* stop isolate recv thread */
    trd.stop();

    /* warn for the message is dropped. */
    if (!trd.empty()) {
        srs_warn("drop the received %d messages", trd.size());
    }

    return ret;
}

3.1 SrsSource::create_consumer

int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer,
    bool ds, bool dm, bool dg)
{
    int ret = ERROR_SUCCESS;

    consumer = new SrsConsumer(this, conn);
    consumers.push_back(consumer);

    double queue_size = _srs_config->get_queue_length(_req->vhost);
    consumer->set_queue_size(queue_size);

    /* if atc, update the sequence header to gop cache time. */
    if (atc && !gop_cache->empty()) {
        if (cache_metadata) {
            cache_metadata->timestamp = gop_cache->start_time();
        }
        if (cache_sh_video) {
            cache_sh_video->timestamp = gop->cache->start_time();
        }
        if (cache_sh_audio) {
            cache_sh_audio->timestamp = gop_cache->start_time();
        }
    }

    /* copy metadata. */
    if (dm && cache_metadata &&
        (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        ...
    }

    /* copy sequence header
     * copy audio sequence first, for hls to fast parse the "right" audio codec.
     * @see https://github.com/ossrs/srs/issues/301 */
    if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        ...
    }

    if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algotithm)) != ERROR_SUCCESS) {
        ...
    }

    /* copy gop cache to client. */
    if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
        return ret;
    }

    /* print status. */
    if (dg) {
        srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);
    } else {
        srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
    }

    /* for edge, when play edge stream, check the state */
    if (_srs_config->get_vhost_is_edge(_req->vhost)) {
        /* notice edge to start for the first client. */
        if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
            ...
        }
    }

    return ret;
}

原文地址:https://www.cnblogs.com/jimodetiantang/p/9113606.html

时间: 2024-08-30 16:47:52

SRS之播放推流视频的相关文章

Raspbian PIXEL不能播放flash视频的解决办法

Raspbian PIXEL虽然已经自带chromium浏览器,而且也安装了Pepper Flash Player插件,但默认还是不能播放flash视频的,需要执行下面两个命令之后才能真正解决这个问题. sudo apt-get update sudo apt-get dist-upgrade PS:执行上面的命令之前,最好把软件源改一下,要不然估计要很长时间才能执行完.

html5播放mp4视频代码

1.nginx支持flv和mp4格式播放 默认yum安装nginx centos7安装nginx时候应该是默认安装nginx_mod_h264_streaming模块的 # nginx -V查看是否安装nginx_mod_h264_streaming模块 nginx在新版本中已经支持了--with-http_mp4_module --with-http_flv_module这2个模块即可 # vi /etc/nginx/nginx.conf server { listen       80 de

android webview 通过html5播放在线视频 切换大屏

1.添加网络访问权限 <uses-permission android:name="android.permission.INTERNET" /> 2.webview添加全屏支持 developer官方文档关于html5支持视频播放描述如下:In order to support inline HTML5 video in your application, you need to have hardware acceleration turned on, and set

VLC播放RTSP视频延迟问题

VLC播放RTSP视频延迟问题 配置 VLC 以播放 RTSP/RTP 流 vlc播放rtp封装的h.264延时很大是什么原因? 开启打印: VLC的工具->消息->等级 调整为2,就可以看到VLC的错误调试信息.

Android[安德鲁斯] 文本Air Video 远程播放电脑视频

苹果iOS下列.目前应用Air Video,能力iOS由Wifi远程直接播放电脑视频,无需看视频复制到手机.非常好用!最近使用Android打电话.展望类别似应用,找了很长一段时间没有找到.在仔细的思考AirVideo工程,视频站点类似,在Android以下应该也能够有类似的方法实现. 今日在华为荣耀6上面最终验证通过了一个完美的方案.赶紧放上来和有相同须要的朋友们分享分享. 先说一下Air Video的工作原理: 1) 在Windows以下安装Air Video Serverserver,并设

利用开源jPlayer播放.flv视频文件

最近工作中用到视频播放,在网上搜索对比了好几款开源播放插件后,觉得 jPlayer 是比较不错的,故作此记录! 接下来先快速的展示一下 利用jPlayer播放.flv视频的效果: 1 <!DOCTYPE html> 2 <html xmlns="http://www.w3.org/1999/xhtml"> 3 <head> 4 <meta http-equiv="Content-Type" content="tex

播放本地视频

1.播放本地视频 PlayLocationVideoInSDActivity.java 关键点1: String path = "/sdcard/TZH.mp4"; //String path2 = ""+Environment.getExternalStorageDirectory()+"/TZH.mp4";//等价上面的地址 关键点2:控制进度条 package com.example.videoplaydemo; import androi

unity 播放外部视频

摘要: Unity支持的播放视频格式有.mov..mpg..mpeg..mp4..avi和.asf.只需将对应的视频文件拖拽入Project视图即可,它会自动生成对应的MovieTexture对象. 1.Unity3D中播放游戏视频的方式有两种,第一种是在游戏对象中播放,就好比在游戏世界中创建一个Plane面对象,摄像机直直的照射在这个面上.第二种是在GUI层面上播放视频.播放视频其实和贴图非常相像,因为播放视频用到的MovieTexture属于贴图Texture的子类 //电影纹理 publi

播放网络视频

播放网络视频: PlayInternetVideoActivity.java package com.example.videoplaydemo; import android.app.Activity; import android.net.Uri; import android.os.Bundle; import android.os.Environment; import java.io.File; import android.app.Activity; import android.o