1. 综述
首先,推流直播的配置文件如下:
# rtmp.conf
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
}
搭建的简陋直播步骤如下:
- 启动 srs:./obj/srs -c ./conf/rtmp.conf;
- 设置并开启 obs 推流,obs 的视频来源随便,可以直接是视频获取设备或本地文件,此外 obs 的设置如下图:
注:必须填 "流名称",因为 SRS 不支持没有流名称的请求。
- 开启 obs 推流后,即可使用另一个客户端向 SRS 播放 obs 推流的视频,这里用 vlc 进行播放,vlc 播放流填:
obs 推流过程分析可见如下链接:
- SRS 如何建立对服务器端口的监听,以便监听客户端的连接请求: SRS之监听端口的管理:RTMP
- SRS 的主线程循环休眠,以便调度其他线程运行,如监听端口的线程 tcp: SRS之SrsServer::cycle()
- SRS 监听端口的线程监听到有客户端的连接请求后,会 accept 客户端的连接,并创建一个 conn 线程,专用于处理与该客户端的请求: SRS之RTMP连接处理线程conn:接收客户端推流
- 在 conn 线程的循环开始时,首先服务器会与客户端进行 handshake 过程: SRS之RTMP handshake
- handshake 成功后,会接收客户端 handshake 后的第一个命令,一般为 connect(‘xxx‘): SRS之SrsRtmpServer::connect_app详解
- 接着进入 SrsRtmpConn::service_cycle 函数: SRS之SrsRtmpConn::service_cycle详解
- 在 SrsRtmpConn::service_cycle 函数开始向客户端发送 应答窗口大小(5)、设置流带宽(6)、回复客户端 connect 的响应等消息后,开始进入循环,此时,调用 stream_service_cycle 函数: SRS之SrsRtmpConn::stream_service_cycle详解
- 在 stream_service_cycle 中,首先会鉴别客户端要做的实际请求是什么,如 publish 或 play,若为 FMLE 类型的 publish,则会调用 start_fmle_publish 函数与客户端先进行一些列消息的交互,然后开始处理真正的推流: SRS之SrsRtmpConn::publishing详解
- 若客户端为 publish,则在 publishing 函数中,会创建一个专门用于接收客户端推流数据的线程 recv,该线程会将推流数据缓存到 consumer(若此时有客户端播放推流视频的话) 中: SRS之接收推流线程:recv
下面即开始在第 8 步的基础上分析:vlc 连接 SRS,请求播放 obs 推流的的视频(前面建立连接的过程和上面 1~7 差不多)。
下面的分析会简化一下源码。
2. SrsRtmpConn::stream_service_cycle
int SrsRtmpConn::stream_service_cycle()
{
int ret = ERROR_SUCCESS;
SrsRtmpConnType type;
/* 首先,鉴别客户端连接的类型,这里应该为 play */
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration))
!= ERROR_SUCCESS) {
...
}
/* 对 url、host 这些数据进行简化,如去掉空格或其他不必要的字符 */
req->strip();
/* 若配置文件中没有配置 security,则忽略 */
// sercurity check
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
...
}
/* SRS 不支持空的流名称,因为对于 HLS 可能会通过空的流名称写到一个文件中 */
if (req->stream.empty()) {
...
}
/* 设置服务器 send/recv 的超时时间,这里都为 30*1000*1000LL */
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
/* 找到一个 source 来为该客户端提供服务 */
SrsSource* source = NULL;
if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
...
}
/* update the statistic when source disconveried. */
...
/* 若配置文件中没有配置 mode 的话,默认返回 false */
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
/* 若配置文件中没有配置 gop_cache,则默认开始 gop_cache */
bool enabled_cache = _srs_config->get_app_cache(req->vhost);
source->set_cache(enabled_cache);
/* 这里应为 SrsRtmpConnPlay */
client_type = type;
switch (type) {
case SrsRtmpConnPlay: {
/* response connection start play */
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
...
}
/* 若配置文件中没有配置 http_hooks,则忽略该函数 */
if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
...
}
/* 这里开始向客户端发送 obs 推的流 */
ret = playing(source);
http_hooks_on_stop();
return ret;
}
...
}
return ret;
}
2.1 SrsRtmpServer::identify_client
该函数是通过接收一些客户端发来的消息来鉴别该客户端的请求的类型:publish or play。
/*
* recv some mesage to identify the client.
* @stream_id, client will createStream to play or publish by flash,
* the stream_id used to response the createStream request.
* @type, output the client type.
* @stream_name, output the client publish/play stream name. @see: SrsRequest.stream
* @duration, output the play client duration. @see: SrsRequest.duration
*/
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type,
string& stream_name, double& duration)
{
type = SrsRtmpConnUnknown;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
/* 获取一个完整的消息 */
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() ||
h.is_window_ackledgement_size() || h.is_user_control_message()) {
continue;
}
SrsPacket* pkt = NULL;
/* 解析该消息 */
if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsPacket, pkt);
/* 接收到的消息为 createStream(‘livestream‘) */
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
srs_info("identify client by create stream, play or flash publish.");
return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt),
stream_id, type, stream_name, duration);
}
...
}
return ret;
}
该函数检测接收到的消息为 createStream 后,接着调用 identify_create_stream_client 做进一步的处理。
2.1.1 SrsRtmpServer::identify_create_stream_client
int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id,
SrsRtmpConnType& type, string& stream_name, double& duration)
{
int ret = ERROR_SUCCESS;
if (true) {
/* 构造一个用于响应 createStream 消息的类 */
SrsCreateStreamResPacket* pkt =
new SrsCreateStreamResPacket(req->transaction_id, stream_id);
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
...
}
}
while (true) {
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() ||
h.is_window_ackledgement_size() || h.is_user_control_message()) {
continue;
}
if (!h.is_amf0_command() && !h.is_amf3_command()) {
srs_trace("identify ignore messages except "
"AMF0/AMF3 command message. type=%#x", h.message_type);
continue;
}
SrsPacket* pkt = NULL;
if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
...
}
/* 直到接收到 play 才返回 */
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
srs_info("level1 identify client by play.");
return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt),
type, stream_name, duration);
}
...
}
return ret;
}
2.1.2 SrsRtmpServer::identify_play_client
int SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type,
string& stream_name, double& duration)
{
int ret = ERROR_SUCCESS;
type = SrsRtmpConnPlay;
/* 客户端请求播放的流名称,可知为 livestream */
stream_name = req->stream_name;
duration = req->duration;
srs_info("identity client type=play, stream_name=%s, duration=%.2f",
stream_name.c_str(), duration);
return ret;
}
鉴别到客户端请求的行为为 play 后,接着为该请求获取一个 SrsSource 类的 source,用于为该客户端的请求提供服务。
2.2 SrsSource::fetch_or_create
/*
* create source when fetch from cache failed.
* @param r the client request.
* @param h the event handler for source.
* @param pps the matched source, if success never be NULL.
*/
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
/* 先从全局变量 SrsSource::pool 中寻找是否存在该 stream_url 对应的 source */
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
/* 若不存在,下面则是新构建一个,并将该 source 放入到 pool 中 */
/* 根据 vhost/app/stream 生成一个 stream_url */
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
/* 构造一个新的 source */
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
/* 将该 source 以 stream_url 为索引值放入到 pool 中 */
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
*pps = source;
return ret;
}
2.2.2 构造 SrsSource
/* live streaming source. */
SrsSource::SrsSource()
{
_req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_HLS
hls = new SrsHls();
#endif
#ifdef SRS_AUTO_DVR
dvr = new SrsDvr();
#endif
#ifdef SRS_AUTO_TRANSCODE
encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
hds = new SrsHds(this);
#endif
/* cache_metadata: 缓存元数据
* cache_sh_video: 缓存 sps,pps
* cache_sh_audio: 缓存 audio sequence header */
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = -1;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
/* 默认开启 gop_cache */
gop_cache = new SrsGopCache();
aggregate_stream = new SrsStream();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
接下来,开始响应客户端的 play 命名。
2.3 SrsRtmpServer::start_play
int SrsRtmpServer::start_play(int stream_id)
{
int ret = ERROR_SUCCESS;
// StreamBegin
if (true) {
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrsPCUCStreamBegin;
pkt->event_data = stream_id;
/* 向客户端发送 Stream Begin 1 的用户控制消息 */
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
...
}
}
// onStatus(NetStream.Play.Reset)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamReset));
pkt->data->set(StatusDescription,
SrsAmf0Any::str("Playing and resetting stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
}
// onStatus(NetStream.Play.Start)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamStart));
pkt->data->set(StatusDescription, SrsAmf0Any::str("Started playing stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
}
// |RtmpSampleAccess(false, false)
if (true) {
SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
// allow audio/video sample.
// @see: https://github.com/ossrs/srs/issues/49
pkt->audio_sample_access = true;
pkt->video_sample_access = true;
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
srs_info("send |RtmpSampleAccess(false, false) message success.");
}
// onStatus(NetStream.Data.Start)
if (true) {
SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
srs_info("send onStatus(NetStream.Data.Start) message success.");
}
srs_info("start play success.");
return ret;
}
该函数依次向客户端发送的消息如下几幅图。
send: StreamBegin
send: onStatus(NetStream.Play.Reset)
send: onStatus(NetStream.Play.Start)
send: |RtmpSampleAccess(false, false)
send: onStatus(NetStream.Data.Start)
3. SrsRtmpConn::playing
int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
/* create consumer of source. */
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsConsumer, consumer);
/* use isolate thread to recv,
* @see: https://github.com/ossrs/srs/issues/217 */
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
/* start isolate recv thread */
if ((ret = trd.start()) != ERROR_SUCCESS) {
...
}
/* delivery message for clients playing stream. */
wakable = consumer;
ret = do_playing(source, consumer, &trd);
wakable = NULL;
/* stop isolate recv thread */
trd.stop();
/* warn for the message is dropped. */
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
return ret;
}
3.1 SrsSource::create_consumer
int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer,
bool ds, bool dm, bool dg)
{
int ret = ERROR_SUCCESS;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(_req->vhost);
consumer->set_queue_size(queue_size);
/* if atc, update the sequence header to gop cache time. */
if (atc && !gop_cache->empty()) {
if (cache_metadata) {
cache_metadata->timestamp = gop_cache->start_time();
}
if (cache_sh_video) {
cache_sh_video->timestamp = gop->cache->start_time();
}
if (cache_sh_audio) {
cache_sh_audio->timestamp = gop_cache->start_time();
}
}
/* copy metadata. */
if (dm && cache_metadata &&
(ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
...
}
/* copy sequence header
* copy audio sequence first, for hls to fast parse the "right" audio codec.
* @see https://github.com/ossrs/srs/issues/301 */
if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
...
}
if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algotithm)) != ERROR_SUCCESS) {
...
}
/* copy gop cache to client. */
if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
/* print status. */
if (dg) {
srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
}
/* for edge, when play edge stream, check the state */
if (_srs_config->get_vhost_is_edge(_req->vhost)) {
/* notice edge to start for the first client. */
if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
...
}
}
return ret;
}
原文地址:https://www.cnblogs.com/jimodetiantang/p/9113606.html
时间: 2024-11-13 01:45:41