1. 综述
SRS 关于 HLS 的具体配置可见: HLS部署实例
SRS 关于 hls 的配置文件内容如下:
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
hls {
enabled on;
hls_fragment 10;
hls_window 60;
hls_path ./objs/nginx/html;
hls_m3u8_file [app]/[stream].m3u8;
hls_ts_file [app]/[stream]-[seq].ts;
}
}
SRS 端具体流程分析如下:
- SRS 建立对 1935 端口的监听: SRS之监听端口的管理:RTMP
- 在建立对 1935 端口监听的过程中创建了针对该端口的 tcp 监听线程,在该线程中 accept 客户端的连接请求,接着针对该客户端创建一个 conn 线程,以便为该客户端提供服务: SRS之RTMP连接处理线程conn:接收客户端推流
- 在 conn 线程的循环开始时,首先服务器会与客户端进行 handshake 过程: SRS之RTMP handshake
- handshake 成功后,会接收客户端 handshake 后的第一个命令,一般为 connect(‘xxx‘): SRS之SrsRtmpServer::connect_app详解
- 接着进入 SrsRtmpConn::service_cycle 函数: SRS之SrsRtmpConn::service_cycle详解
- 在 SrsRtmpConn::service_cycle 函数开始向客户端发送 应答窗口大小(5)、设置流带宽(6)、设置块大小(1)、response connect 的响应等消息后,开始进入循环,此时,调用 stream_service_cycle 函数
对于 hls 的分析,下面开始从 stream_service_cycle 开始分析。
2. SrsRtmpConn::stream_service_cycle
int SrsRtmpConn::stream_service_cycle()
{
int ret = ERROR_SUCCESS;
SrsRtmpConnType type;
/* 接收一些消息,然后根据该消息的内容识别该客户端接下来是 publish 还是 play */
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration))
!= ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("identify client failed. ret=%d", ret);
}
return ret;
}
req->strip();
/* security check: 若 vhost 中没有配置 security,则默认返回 false */
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
srs_error("security check failed. ret=%d", ret);
return ret;
}
/* SRS 不支持流名称为空的请求,因为对于 HLS 需要通过流名称写入到文件中 */
if (req->stream.empty()) {
ret = ERROR_RTMP_STREAM_NAME_EMPTY;
srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
return ret;
}
/* client is identified, set the timeout to service timeout. */
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
/* find a source to serve. */
SrsSource* source = NULL;
if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS)
{
return ret;
}
srs_assert(source != NULL);
/* update the statistic when source disconveried. */
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS)
{
srs_error("stat client failed. ret=%d", ret);
return ret;
}
/* 若 vhost 没有配置 mode,则默认返回 false */
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
/* 默认开启 gop_cache */
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
source->set_cache(enabled_cache);
client_type = type;
switch (type) {
...
case SrsRtmpConnFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
...
default: {
ret = ERROR_SYSTEM_CLIENT_INVALID;
srs_info("invalid client type=%d. ret=%d", type, ret);
return ret;
}
}
return ret;
}
2.1 SrsSource::fetch_or_create
/* 该全局变量 pool 用于保存创建的 SrsSource 类对象,
* key: vhost/app/stream
* value: SrsSource* */
std::map<std::string, SrsSource*> SrsSource::pool;
/*
* create source when fetch from cache failed.
* @param r, the client request.
* @param h, the event handlr for source.
* @param pps, the matched source, if success never be NULL.
*/
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
SrsSource* source = NULL;
/* 检测 pool 中是否有 key(如vhost/live/livestream)对应的 SrsSource*,
* 若有则返回该 SrsSource* */
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
string stream_url = r->get_stream_url();
string vhost = r->vhost;
/* should always not exists for create a source. */
srs_assert (pool.find(stream_url) == pool.end());
/* 获取或构建一个新的 SrsSource */
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
/* 将创建成功并初始化后 source 放入到 pool 中 */
pool[stream_url] = source;
*pps = source;
return ret;
}
下面主要分析 SrsSource 的构造。
2.1.1 SrsSource 构造
/* live streaming source. */
SrsSource::SrsSource()
{
/* deep copy of client request. */
_req = NULL;
/* the time jitter algorithm for vhost. 这里初始化关闭 */
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_HLS
/* 构造 SrsHls 类 */
hls = new SrsHls();
#endif
...
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
/* can publish, true when is not streaming */
_can_publish = true;
_pre_source_id = _source_id = -1;
/* last die time, when all consumers quit and no publisher,
* we will remove the source when source die. */
die_at = -1;
/* edge control service */
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
/* gop cache for client fast startup. */
gop_cache = new SrsGopCache();
/* for aggregate message */
aggregate_stream = new SrsStream();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
2.1.2 SrsHls 构造
- SrsHls: delivery RTMP stream to HLS(m3u8 and ts), SrsHls provides interface with SrsSource.
SrsHls::SrsHls()
{
_req = NULL;
source = NULL;
hls_enabled = false;
hls_can_dispose = false;
last_update_time = 0;
codec = new SrsAvcAacCodec();
sample = new SrsCodecSample();
jitter = new SrsRtmpJitter();
muxer = new SrsHlsMuxer();
hls_cache = new SrsHlsCache();
pprint = SrsPithyPrint::create_hls();
/*
* we store the stream dts,
* for when we notice the hls cache to publish,
* it need to know the segment start dts.
*
* for example. when republish, the stream dts will
* monotonically increase, and the ts dts should start
* from current dts.
*
* or, simply because the HlsCache never free when unpublish,
* so when publish or republish it must start at stream dts,
* not zero dts.
*/
stream_dts = 0;
}
2.1.3 SrsAvcAacCodec 构造
- the h264/avc and aac codec, for media stream.
- to demux the FLV/RTMP video/audio packet to sample,
add eache NALUs of h.264 as a sample unit to sample,
while the entire aac raw data as a sample unit.
- for sequence header, demux it and save it in the avc_extra_data and aac_extra_data.
- for the codec info, such as audio sample rate, decode from FLV/RTMP header,
then use codec info in sequence header ro override it.
SrsAvcAacCodec::SrsAvcAacCodec()
{
/* for sequence header, whether parse the h.264 sps. */
avc_parse_sps = true;
width = 0;
height = 0;
duration = 0;
NAL_unit_length = 0;
frame_rate = 0;
video_data_rate = 0;
video_codec_id = 0;
audio_data_rate = 0;
audio_codec_id = 0;
avc_profile = SrsAvcProfileReserved;
avc_level = SrsAvcLevelReserved;
aac_object = SrsAacObjectTypeReserved;
aac_sample_rate = SRS_AAC_SAMPLE_RATE_UNSET; // sample rate ignored
aac_channels = 0;
avc_extra_size = 0;
avc_extra_data = NULL;
aac_extra_size = 0;
aac_extra_data = NULL;
sequenceParameterSetLength = 0;
sequenceParameterSetNALUnit = NULL;
pictureParameterSetLength = 0;
pictureParameterSetNALUnit = NULL;
/* the avc payload format. */
payload_format = SrsAvcPayloadFormatGuess;
stream = new SrsStream();
}
2.1.4 SrsCodecSample 构造
- the samples in the flv audio/video packet.
the sample used to analysis a video/audio packet,
split the h.264 NALUs to buffers, or aac raw data to a buffer,
and decode the video/audio specified infos.
- the sample unit:
- a video packet codec in h.264 contains many NALUs, each is a sample unit.
- a audio packet codec in aac is sample unit.
- @remark, the video/audio sequence header is not sample unit,
all sequence header stores as extra data,
@see SrsAvcAacCodec.avc_extra_data and SrsAvcAacCodec.aac_extra_data
- @remark, user must clear all samples before decode a new video/audio packet.
SrsCodecSample::SrsCodecSample()
{
clear();
}
/**
* clear all samples.
* the sample units never copy the bytes, it directly use the ptr,
* so when video/audio packet is destroyed, the sample must be clear.
* in a word, user must clear sample before demux it.
* @remark demux sample use SrsAvcAacCodec.audio_aac_demux or video_avc_demux.
*/
void SrsCodecSample::clear()
{
/* whether the sample is video sample which demux from video packet. */
is_video = false;
/*
* each audio/video raw data packet will dumps to one or multiple buffers,
* the buffers will write to hls and clear to reset.
* generally, aac audio packet corresponding to one buffer,
* where avc/h264 video packet may contains multiple buffer.
*/
nb_sample_units = 0;
/*
* CompositionTime, video_file_format_spec_v10_1.pdf, page 78.
* cts = pts - dts, where dts = flvheader->timestamp.
*/
cts = 0;
frame_type = SrsCodecVideoAVCFrameReserved;
avc_packet_type = SrsCodecVideoAVCTypeReserved;
has_sps_pps = has_aud = has_idr = false;
first_nalu_type = SrsAvcNaluTypeReserved;
acodec = SrsCodecAudioReserved1;
sound_rate = SrsCodecAudioSampleRateReserved;
sound_size = SrsCodecAudioSampleSizeReserved;
sound_type = SrsCodecAudioSoundTypeReserved;
aac_packet_type = SrsCodecAudioTypeReserved;
}
2.1.5 SrsHlsMuxer 构造
- muxer the HLS stream(m3u8 and ts files). generally, the m3u8 muxer only provides methods to open/close segments, to flush video/audio, without any mechenisms.
- that is, user must use HlsCache, which will control the methods of muxer, and provides HLS mechenisms.
SrsHlsMuxer::SrsHlsMuxer()
{
req = NULL;
hls_fragment = hls_window = 0;
hls_aof_ratio = 1.0;
/*
* the deviation in piece to adjust the fragment to be more
* bigger or smaller.
*/
deviation_ts = 0;
hls_cleanup = true;
hls_wait_keyframe = true;
previous_floor_ts = 0;
/*
* the previous reap floor timestamp,
* used to detect the dup or jmp or ts.
*/
accept_floor_ts = 0;
/* whether use floor algorithm for timestamp. */
hls_ts_floor = false;
max_td = 0;
_sequence_no = 0;
/* current writing segment. */
current = NULL;
/*
* the current audio codec, when open new muxer,
* set the muxer audio codec.
* @see https://github.com/ossrs/srs/issues/301
*/
acodec = SrsCodecAudioReserved1;
should_write_cache = false;
should_write_file = true;
/* 构建了一个可重复使用(SrsReusableThread)的线程:async */
async = new SrsAsyncCallWorker();
/* the ts context, to keep cc continous between ts. */
context = new SrsTsContext();
}
2.1.6 SrsAsyncCallWorker 构造
- the async callback for dvr. when worker call with the task, the worker will do it in isolate thread. that is, the task is execute/call in async mode.
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
/* 创建一个可重复使用的线程:async */
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
}
2.1.6.1 SrsReusableThread 构造
SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h,
int64_t interval_us)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
}
2.1.6.2 SrsThread 构造
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
tid = NULL;
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
disposed = false;
// in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
// @see https://github.com/ossrs/srs/issues/110
// thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
can_run = false;
}
2.1.7 SrsHlsCache 构造
- hls stream cache, use to cache hls stream and flush to hls muxer.
- when write stream to ts file:
- video frame will directly flush to M3u8Muxer
- audio frame need to cache, because it‘s small and flv tbn problem.
- whatever, the Hls cache used to cache video/audio, and flush video/audio to m3u8 muxer if needed.
- about the flv tbn problem:
- flv tbn is 1/1000, ts tbn is 1/90000, when timestamp convert to flv tbn, it will loose precise, so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter, we use a aac jitter to correct the audio pts.
SrsHlsCache::SrsHlsCache()
{
cache = new SrsTsCache();
}
2.1.8 SrsTsCache 构造
- ts stream cache, use to cache ts stream.
- about the flv tbn problem:
- flv tbn is 1/1000, ts tbn is 1/90000, when timestamp convert to flv tbn, it will loose precise, so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter, we use a aac jitter to correct the audio pts.
SrsTsCache::SrsTsCache()
{
/* current ts message. */
audio = NULL;
video = NULL;
}
上面构造好 SrsSource 后,接着调用 SrsSource 的 initialize 函数进行初始化。
2.1.9 SrsSource::initialize
int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
int ret = ERROR_SUCCESS;
srs_assert(h);
srs_assert(!_req);
/* the event handler. */
handler = h;
/* deep copy of client request. */
_req = r->copy();
/* 若 vhost 中没有配置 atc,默认返回 false */
atc = _srs_config->get_atc(_req->vhost);
#ifdef SRS_AUTO_HLS
if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
return ret;
}
#endif
..
if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
/* 若 vhost 中没有配置 queue_length,则默认播放队列的大小为 30s
*
* in seconds, the live queue length
* #define SRS_PERF_PLAY_QUEUE 30
*/
double queue_size = _srs_config->get_queue_length(_req->vhost);
publish_edge->set_queue_size(queue_size);
/* the time jitter algorithm for vhost.
* 若 vhost 中没有配置 time_jitter,则默认返回值为 SrsRtmpJitterAlgorithmFULL */
jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
/* 若 vhost 中没有配置 mix_correct,则默认返回 false,
* 即禁止使用 interlaved/mixed algorithm 去校正 timstamp */
mix_correct = _srs_config->get_mix_correct(_req->vhost);
return ret;
}
2.1.10 SrsHls::initialize
initialize the hls by handler and source.
int SrsHls::initialize(SrsSource* s)
{
int ret = ERROR_SUCCESS;
source = s;
/* 初始化 hls muxer */
if ((ret = muxer->initialize()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
2.1.11 SrsHlsMuxer::initialize
/* initialize the hls muxer. */
int SrsHlsMuxer::initialize()
{
int ret = ERROR_SUCCESS;
/* 启动 async 线程 */
if ((ret = async->start()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
2.1.12 SrsAsyncCallWorker::start
int SrsAsyncCallWorker::start()
{
return pthread->start();
}
2.1.13 SrsReusableThread::start
int SrsReusableThread::start()
{
return pthread->start();
}
2.1.14 SrsThread::start
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
if(tid) {
srs_info("thread %s already running.", _name);
return ret;
}
/* 创建线程 */
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
disposed = false;
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
st_usleep(10 * 1000);
}
// now, cycle thread can run.
can_run = true;
return ret;
}
3. SrsRtmpConn::publishing
int SrsRtmpConn::publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
/* vhost 中没有配置 refer_publish,忽略 */
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost)))
!= ERROR_SUCCESS) {
srs_error("check publish_refer failed. ret=%d", ret);
return ret;
}
/* vhost 中没有配置 http_hooks,ignore */
if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
srs_error("http hook on_publish failed. ret=%d", ret);
return ret;
}
/* 这里返回 false */
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
SrsPublishRecvThread tcp(rtmp, req,
st_netfd_fileno(stfd), 0, this, source,
client_type != SrsRtmpConnFlashPublish,
vhost_is_edge);
ret = do_publishing(source, &trd);
/* stop isolate recv thread */
trd.stop();
}
/*
* whatever the acquire publish, always release publish.
* when the acquire error in the middle-way, the publish state changed,
* but failed, so we must cleanup it.
* @remark, when stream is busy, should never release it.
*/
if (ret != ERROR_SYSTEM_STREAM_BUSY) {
release_publish(source, vhost_is_edge);
}
http_hooks_on_unpublish();
return ret;
}
3.1 SrsRtmpConn::acquire_publish
int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
{
int ret = ERROR_SUCCESS;
/* 这里直接返回 _can_publish 的值,source 构建时初始化为 true */
if (!source->can_publish(is_edge)) {
ret = ERROR_SYSTEM_STREAM_BUSY;
srs_warn("stream %s is already publishing. ret=%d",
req->get_stream_url().c_str(), ret);
return ret;
}
/* vhost 中没有配置 mode,因此 is_edge 为 false */
/* when edge, ignore the publish event, directly proxy it. */
if (is_edge) {
if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
srs_error("notice edge start publish stream failed. ret=%d", ret);
return ret;
}
} else {
if ((ret = source->on_publish()) != ERROR_SUCCESS) {
srs_error("notify publish failed. ret=%d", ret);
return ret;
}
}
return ret;
}
3.1.1 SrsSource::on_publish
/*
* publish stream event notify.
* @param _req, the request from client, the source will deep copy it,
* for when reload the request of client maybe invalid.
*/
int SrsSource::on_publish()
{
int ret = ERROR_SUCCESS;
/* update the request object. */
srs_assert(_req);
_can_publish = false;
/* whatever, the publish thread is the source or edge source,
* save its id to srouce id. */
on_source_id_changed(_srs_context->get_id());
/* reset the mix queue. */
mix_queue->clear();
/* detect the monotonically again. */
is_monotonically_increase = true;
last_packet_time = 0;
/* create forwarders */
/* vhost 没有配置 forward,ignore */
if ((ret = create_forwarders()) != ERROR_SUCCESS) {
srs_error("create forwarders failed. ret=%d", ret);
return ret;
}
/* TODO: FIXME: use initialize to set req. */
#ifdef SRS_AUTO_TRANSCODE
...
#endif
/* TODO: FIXME: use initialize to set req. */
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
srs_error("start hls failed. ret=%d", ret);
return ret;
}
#endif
/* TODO: FIXME: use initialize to set req. */
#ifdef SRS_AUTO_DVR
...
#endif
#ifdef SRS_AUTO_HDS
...
#endif
/* notify the handler. */
srs_assert(handler);
/* 调用子类 SrsServer 实现的 on_publish 函数,没有开启相关功能,ignore */
if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
srs_error("handle on publish failed. ret=%d", ret);
return ret;
}
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(_req, _source_id);
return ret;
}
3.1.2 SrsSource::on_source_id_changed
int SrsSource::on_source_id_changed(int id)
{
int ret = ERROR_SUCCESS;
if (_source_id == id) {
return ret;
}
if (_pre_source_id == -1) {
_pre_source_id = id;
} else if (_pre_source_id != _source_id) {
_pre_source_id = _source_id;
}
_source_id = id;
/* notice all consumer */
std::vector<SrsConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
/* 该函数将 should_update_source_id 置为 true */
consumer->update_source_id();
}
return ret;
}
3.1.3 SrsHls::on_publish
/*
* publish stream event, continue to write the m3u8,
* for the muxer object not destroyed.
* @param fetch_sequence_header, whether fetch sequence from source.
*/
int SrsHls::on_publish(SrsRequest* req, bool fetch_sequence_header)
{
int ret = ERROR_SUCCESS;
srs_freep(_req);
_req = req->copy();
/* update the hls time, for hls_dispose. */
last_update_time = srs_get_system_time_ms();
/* SrsHls 构建时初始化为 false,因此不支持多次 publish */
/* support multiple publish. */
if (hls_enabled) {
return ret;
}
/* vhost 中使能了 hls */
std::string vhost = req->vhost;
if (!_srs_config->get_hls_enabled(vhost)) {
return ret;
}
/* 初始化 muxer,创建 m3u8 目录和文件等等 */
if ((ret = hls_cache->on_publish(muxer, req, stream_dts)) != ERROR_SUCCESS) {
return ret;
}
/* if enabled, open the muxer. */
hls_enabled = true;
/* ok, the hls can be dispose, or need to be dispose. */
hls_can_dispose = true;
/* 由上面调用知,传入的值为 false */
/* when publish, don‘t need to fetch sequence header, which is old and maybe corrupt.
* when reload, we must fetch the sequence header from source cache. */
if (fetch_sequence_header) {
/* notice the source to get the cached sequence header.
* when reload to start hls, hls will never get the sequence header in stream,
* use the SrsSource.on_hls_start to push the sequence header to HLS. */
if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
srs_error("callback source hls start failed. ret=%d", ret);
return ret;
}
}
return ret;
}
3.1.4 SrsHlsCache::on_publish
int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req,
int64_t segment_start_dts)
{
int ret = ERROR_SUCCESS;
std::string vhost = req->vhost;
std::string stream = req->stream;
std::string app = req->app;
/* get the hls fragment time, in seconds.
* 配置文件中设置 hls_fragment 为 10 */
double hls_fragment = _srs_config->get_hls_fragment(vhost);
/* get the hls window time, in seconds.
* a window is a set of ts, the ts collection in m3u8.
* @remark SRS will delete the ts exceed the window.
* 配置文件中设置 hls_window 为 60 */
double hls_window = _srs_config->get_hls_window(vhost);
/* get the hls m3u8 ts list entry prefix config */
/* get the HLS m3u8 list ts segment entry prefix info.
* vhost 中没有配置 hls_entry_prefix,因此返回 "" */
std::string entry_prefix = _srs_config->get_hls_entry_prefix(vhost);
/* get the HLS ts/m3u8 file store path.
* 配置文件中 hls_path 为 ./objs/nginx/html */
std::string path = _srs_config->get_hls_path(vhost);
/* get the HLS m3u8 file path template.
* 配置文件中 hls_m3u8_file 为 [app]/[stream].m3u8 */
std::string m3u8_file = _srs_config->get_hls_m3u8_file(vhost);
/* get the HLS ts file path template.
* 配置文件中 hls_ts_file 为 [app]/[stream]-[seq].ts */
std::string ts_file = _srs_config->get_hls_ts_file(vhost);
/* whether cleanup the old ts files.
* hls 中没有配置 hls_cleanup,因此默认返回 true */
bool cleanup = _srs_config->get_hls_cleanup(vhost);
/* whether reap the ts when got keyframe.
* hls 中没有配置 hls_wait_keyframe,默认返回 true,即表示等待 I帧 */
bool wait_keyframe = _srs_config->get_hls_wait_keyframe(vhost);
/* get the hls aof(audio overflow) ratio.
* hls 没有配置 hls_aof_ratio,默认返回 2.0 */
double hls_aof_ratio = _srs_config->get_hls_aof_ratio(vhost);
/* whether use floor(timestamp/hls_fragment) for variable timestamp
* hls 中没有配置 hls_ts_floor,默认返回 false*/
bool ts_floor = _srs_config->get_hls_ts_floor(vhost);
/* the seconds to dispose the hls.
* hls 中没有配置 hls_dispose,默认返回 0 */
int hls_dispose = _srs_config->get_hls_dispose(vhost);
/* TODO: FIXME: support load exists m3u8, to continue publish stream.
* for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase */
/* open muxer */
if ((ret = muxer->update_config(req, entry_prefix,
path, m3u8_file, ts_file, hls_fragment, hls_window, ts_floor, hls_aof_ratio,
cleanup, wait_keyframe)) != ERROR_SUCCESS
) {
srs_error("m3u8 muxer update config failed. ret=%d", ret);
return ret;
}
if ((ret = muxer->segment_open(segment_start_dts)) != ERROR_SUCCESS) {
srs_error("m3u8 muxer open segment failed. ret=%d", ret);
return ret;
}
return ret;
}
3.1.5 SrsHlsMuxer::update_config
/* when publish, update the config for muxer. */
int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
string path, string m3u8_file, string ts_file, double fragment, double window,
bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe
) {
int ret = ERROR_SUCCESS;
srs_freep(req);
req = r->copy();
hls_entry_prefix = entry_prefix;
hls_path = path;
hls_ts_file = ts_file;
hls_fragment = fragment;
hls_aof_ratio = aof_ratio;
hls_ts_floor = ts_floor;
hls_cleanup = cleanup;
hls_wait_keyframe = wait_keyframe;
previous_floor_ts = 0;
accept_floor_ts = 0;
hls_window = window;
deviation_ts = 0;
/* generate the m3u8 dir and path. *
m3u8_url = srs_path_build_stream(m3u8_file, req->vhost, req->app, req->stream);
m3u8 = path + "/" + m3u8_url;
/* when update config, reset the history target duration. */
/* get_hls_td_ratio: get the hls td(target duration) ratio.
* hls 没有配置 hls_td_ratio,默认返回 1.5,而 fragment 配置为 10,
* 因此 max_td 为 15 */
max_td = (int)(fragment * _srs_config->get_hls_td_ratio(r->vhost));
/* TODO: FIXME: refine better for SRS2 only support disk. */
should_write_cache = false;
should_write_file = true;
/* create m3u8 dir once. */
/* 返回 m3u8 的目录路径,即 ./objs/nginx/html/live */
m3u8_dir = srs_path_dirname(m3u8);
if (should_write_file && (ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS)
{
srs_error("create app dir %s failed. ret=%d", m3u8_dir.c_str(), ret);
return ret;
}
srs_info("create m3u8 dir %s ok", m3u8_dir.c_str());
return ret;
}
3.1.6 srs_path_build_stream
/*
* build the path according to vhost/app/stream, where replace variables:
* [vhost], the vhost of stream.
* [app], the app of stream.
* [stream], the stream name of stream.
* @return the replaced path.
*/
string srs_path_build_stream(string template_path, string vhost, string app, string stream)
{
std::string path = template_path;
/* variable [vhost] */
path = srs_string_replace(path, "[vhost]", vhost);
/* variable [app] */
path = srs_string_replace(path, "[app]", app);
/* variable [stream] */
path = srs_string_replace(path, "[stream]", stream);
return path;
}
3.1.7 SrsHlsMuxer::segment_open
/*
* open a new segment(a new ts file),
* @param segment_start_dts, use to calc the segment duration,
* use 0 for the first segment of HLS.
*/
int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
{
int ret = ERROR_SUCCESS;
/* current: current writing segment. 初始化时为 NULL */
if (current) {
srs_warn("ignore the segment open, for segment is already open.");
return ret;
}
/* when segment open, the current segment must be NULL. */
srs_assert(!current);
/* load the default acodec from config. */
SrsCodecAudio default_acodec = SrsCodecAudioAAC;
if (true) {
/* hls 没有配置 hls_acodec,默认返回 "aac" */
std::string default_acodec_str = _srs_config->get_hls_acodec(req->vhost);
if (default_acodec_str == "mp3") {
default_acodec = SrsCodecAudioMP3;
srs_info("hls: use default mp3 acodec");
} else if (default_acodec_str == "aac") {
default_acodec = SrsCodecAudioAAC;
srs_info("hls: use default aac acodec");
} else if (default_acodec_str == "an") {
default_acodec = SrsCodecAudioDisabled;
srs_info("hls: use default an acodec for pure video");
} else {
srs_warn("hls: use aac for other codec=%s", default_acodec_str.c_str());
}
}
/* load the default vcodec from config. */
SrsCodecVideo default_vcodec = SrsCodecVideoAVC;
if (true) {
/* hls 中没有配置 hls_vcodec,默认返回 "h264" */
std::string default_vcodec_str = _srs_config->get_hls_vcodec(req->vhost);
if (default_vcodec_str == "h264") {
default_vcodec = SrsCodecVideoAVC;
srs_info("hls: use default h264 vcodec");
} else if (default_vcodec_str == "vn") {
default_vcodec = SrsCodecVideoDisabled;
srs_info("hls: use default vn vcodec for pure audio");
} else {
srs_warn("hls: use h264 for other codec=%s", default_vcodec_str.c_str());
}
}
/* new segment. */
current = new SrsHlsSegment(context, should_write_cache, should_write_file,
default_acodec, default_vcodec);
/* _sequence_no: sequence number in m3u8. */
current->sequence_no = _sequence_no++;
/* 若为 HLS 的 first segment,则 segment_start_dts 为 0 */
current->segment_start_dts = segment_start_dts;
/* generate filename. */
std::string ts_file = hls_ts_file;
ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream);
/* SrsHlsMuxer 构造时,初始化该值为 false */
if (hls_ts_floor) {
...
} else {
ts_file = srs_path_build_timestamp(ts_file);
}
if (true) {
std::stringstream ss;
ss << current->sequence_no;
ts_file = srs_string_replace(ts_file, "[seq]", ss.str());
}
current->full_path = hls_path + "/" + ts_file;
/* the ts url, relative or absolute url. */
std::string ts_url = current->full_path;
if (srs_string_starts_with(ts_url, m3u8_dir)) {
ts_url = ts_url.substr(m3u8_dir.length());
}
while (srs_string_starts_with(ts_url, "/")) {
ts_url = ts_url.substr(1);
}
current->uri += hls_entry_prefix;
if (!hls_entry_prefix.empty() && !srs_string_ends_with(hls_entry_prefix, "/"))
{
current->uri += "/";
/* add the http dir to uri. */
string http_dir = srs_path_dirname(m3u8_url);
if (!http_dir.empty()) {
current->uri += http_dir + "/";
}
}
current->uri += ts_url;
/* create dir recursively for hls. */
std::string ts_dir = srs_path_dirname(current->full_path);
if (should_write_file && (ret = srs_create_dir_recursively(ts_dir))
!= ERROR_SUCCESS) {
srs_error("create app dir %s failed. ret=%d", ts_dir.c_str(), ret);
return ret;
}
/* open temp ts file. */
std::string tmp_file = current->full_path + ".tmp";
if ((ret = current->muxer->open(tmp_file.c_str())) != ERROR_SUCCESS) {
srs_error("open hls muxer failed. ret=%d", ret);
return ret;
}
/* set the segment muxer audio codec. */
if (acodec != SrsCodecAudioReserved1) {
current->muxer->update_acodec(acodec);
}
return ret;
}
3.1.8 SrsHlsSegment 构造
- the wrapper of m3u8 segment from specification:
- EXTINF: The EXTINF tag specifies the duration of a media segment.
SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, bool write_cache,
bool write_file, SrsCodecAudio ac, SrsCodecVideo vc)
{
/* duration in seconds in m3u8. */
duration = 0;
/* sequence number in m3u8. */
sequence_no = 0;
/* current segment start dts for m3u8 */
segment_start_dts = 0;
/* whether current segement is sequence header. */
is_sequence_header = false;
/* 由前知 write_cache 为 false,write_file 为 true */
writer = new SrsHlsCacheWriter(write_cache, write_file);
muxer = new SrsTSMuxer(writer, c, ac, vc);
}
3.1.9 SrsHlsCacheWriter 构造
- write to file and cache.
SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file)
{
should_write_cache = write_cache;
should_write_file = write_file;
}
3.1.10 SrsTSMuxer 构造
- write data from frame(header info) and buffer(data) to ts file. it‘s a simple object wrapper for utility from nginx-rtmp: SrsMpegtsWriter
SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w, SrsTsContext* c, SrsCodecAudio ac,
SrsCodecVideo vc)
{
writer = w;
context = c;
acodec = ac;
vcodec = vc;
}
3.1.11 srs_path_build_timestamp
/*
* build the path according to timestamp, where replace variables:
* [2006], replace this const to current year.
* [01], replace this const to current month.
* [02], replace this const to current date.
* [15], replace this const to current hour.
* [04], repleace this const to current minute.
* [05], repleace this const to current second.
* [999], repleace this const to current millisecond.
* [timestamp],replace this const to current UNIX timestamp in ms.
* @return the replace path.
*/
string srs_path_build_timestamp(string template_path)
{
std::string path = template_path;
/* data and time substitude
* clock time */
timeval tv;
if (gettimeofday(&tv, NULL) == -1) {
return path;
}
/* to calendar time */
struct tm* tm;
/* 若没有配置 utc_time,则默认返回 false,即不使用 utc 时间 */
if (_srs_config->get_utc_time()) {
if ((tm = gmtime(&tv.tv_sec)) == NULL) {
return path;
}
} else {
if ((tm = localtime(&tv.tv_sec)) == NULL) {
return path;
}
}
/* the buffer to format the date and time. */
char buf[64];
/* [2006], replace with current year. */
if (true) {
snprintf(buf, sizeof(buf), "%04d", 1900 + tm->tm_year);
path = srs_string_replace(path, "[2006]", buf);
}
/* [01], replace this const to current month. */
if (true) {
snprintf(buf, sizeof(buf), "%02d", 1 + tm->tm_mon);
path = srs_string_replace(path, "[01]", buf);
}
/* [02], replace this const to current date. */
if (true) {
snprintf(buf, sizeof(buf), "%02d", tm->tm_mday);
path = srs_string_replace(path, "[02]", buf);
}
/* [15], replace this const to current hour. */
if (true) {
snprintf(buf, sizeof(buf), "%02d", tm->tm_hour);
path = srs_string_replace(path, "[15]", buf);
}
/* [04], repleace this const to current minute. */
if (true) {
snprintf(buf, sizeof(buf), "%02d", tm->tm_min);
path = srs_string_replace(path, "[04]", buf);
}
/* [05], repleace this const to current second. */
if (true) {
snprintf(buf, sizeof(buf), "%02d", tm->tm_sec);
path = srs_string_replace(path, "[05]", buf);
}
/* [999], repleace this const to current millisecond. */
if (true) {
snprintf(buf, sizeof(buf), "%03d", (int)(tv.tv_usec / 1000));
path = srs_string_replace(path, "[999]", buf);
}
/* [timestamp],replace this const to current UNIX timestamp in ms. */
if (true) {
int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec;
snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000);
path = srs_string_replace(path, "[timestamp]", buf);
}
return path;
}
3.1.12 SrsTSMuxer::open
/*
* open the writer, donot write the PSI of ts.
* @param p, a string indicates the path of ts file to mux to.
*/
int SrsTSMuxer::open(string p)
{
int ret = ERROR_SUCCESS;
path = p;
close();
/* reset the context for a new ts start. */
context->reset();
/* 调用子类 SrsHlsCacheWriter 实现的 open 函数 */
if ((ret = writer->open(path)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
3.1.13 SrsHlsCacheWriter::open
int SrsHlsCacheWriter::open(string file)
{
if (!should_write_file) {
return ERROR_SUCCESS;
}
return impl.open(file);
}
3.1.14 SrsFileWriter::open
/*
* open file writer, in truncate mode.
* @param p, a string indicates the path of file to open.
*/
int SrsFileWriter::open(string p)
{
int ret = ERROR_SUCCESS;
if (fd > 0) {
ret = ERROR_SYSTEM_FILE_ALREADY_OPENED;
srs_error("file %s already opened. ret=%d", path.c_str(), ret);
return ret;
}
int flags = O_CREAT|O_WRONLY|O_TRUNC;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((fd = ::open(p.c_str(), flags, mode)) < 0) {
ret = ERROR_SYSTEM_FILE_OPENE;
srs_error("open file %s failed. ret=%d", p.c_str(), ret);
return ret;
}
path = p;
return ret;
}
上面分析完 acquire_publish 函数,下面接着分析 SrsPublishRecvThread 线程的构建。
3.2 SrsPublishRecvThread 构造
在该构造函数中,创建了一个可重复使用的 recv 线程,专门用于接收客户端推流的音视频数据。
- the publish recv thread got message and callback the source method to process message.
SrsPublishRecvThread::SrsPublishRecvThread(
SrsRtmpServer* rtmp_sdk,
SrsRequest* _req, int mr_sock_fd, int timeout_ms,
SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
): trd(this, rtmp_sdk, timeout_ms)
{
rtmp = rtmp_sdk;
_conn = conn;
/* the params for conn callback. */
_source = source;
/* 由前知,为 true */
_is_fmle = is_fmle;
/* false */
_is_edge = is_edge;
recv_error_code = ERROR_SUCCESS;
/* the msgs already got. */
_nb_msgs = 0;
/* The video frames we got. */
video_frames = 0;
/* the error timeout cond */
error = st_cond_new();
ncid = cid = 0;
req = _req;
mr_fd = mr_sock_fd;
/* the mr settings,
* @see https://github.com/ossrs/srs/issues/241 */
/* 没有配置 mr,默认返回 false */
mr = _srs_config->get_mr_enabled(req->vhost);
/* 默认返回 350 */
mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
/* 没有配置 min_latency,默认返回 false */
realtime = _srs_config->get_realtime_enabled(req->vhost);
_srs_config->subscribe(this);
}
调用该 SrsPublishRecvThread 的构造函数前,先调用 SrsRecvThread 类的构造函数。
3.2.1 SrsRecvThread 构造
- the recv thread, use message handler to handle each received message.
SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler,
SrsRtmpServer* rtmp_sdk, int timeout_ms)
{
timeout = timeout_ms;
handler = msg_handler;
rtmp = rtmp_sdk;
trd = new SrsReusableThread2("recv", this);
}
3.2.2 SrsReusableThread2 构造
SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h,
int64_t interval_us)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
}
3.2.3 SrsThread 构造
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler,
int64_t interval_us, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
tid = NULL;
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
disposed = false;
// in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
// @see https://github.com/ossrs/srs/issues/110
// thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
can_run = false;
}
4. SrsRtmpConn::do_publishing
int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{
int ret = ERROR_SUCCESS;
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
/* start isolate recv thread. */
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret);
return ret;
}
/* change the isolate recv thread context id,
* merge its log to current thread. */
int receive_thread_cid = trd->get_cid();
trd->set_cid(_srs_context->get_id());
/* initialize the publish timeout. */
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
/* set the sock options. */
set_sock_options();
if (true) {
/* 没有启用 mr 功能,为 false */
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
}
int64_t nb_msgs = 0;
uint64_t nb_frames = 0;
while (!disposed) {
pprint->elapse();
/* when source is set to expired, disconnect it. */
if (expired) {
ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret);
return ret;
}
/* conn wait for timeout */
if (nb_msgs == 0) {
/* when not got msgs, wait for a larger timeout. */
trd->wait(publish_1stpkt_timeout);
} else {
trd->wait(publish_normal_timeout);
}
/* check the thread error code */
if ((ret = trd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) &&
!srs_is_client_gracefully_close(ret)) {
srs_error("recv thread failed. ret=%d", ret);
}
return ret;
}
/* when not any messages, timeout */
if (trd->nb_msgs() <= nb_msgs) {
ret = ERROR_SOCKET_TIMEOUT;
srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d",
nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
break;
}
nb_msgs = trd->nb_msgs();
/* update the stat for video fps. */
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames)))
!= ERROR_SUCCESS) {
return ret;
}
nb_frames = trd->nb_video_frames();
/* reportable */
if (pprint->can_print()) {
kbps->sample();
bool mr = _srs_config->get_mr_enabled(req->vhost);
int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
);
}
}
return ret;
}
该 while 循环主要是循环进行 error 条件变量上进入休眠状态,等待该条件变量成立(即若 recv 线程发生错误,则会通过 error 条件变量唤醒在该条件变量上等待的线程,以便该线程进行处理 recv error),或者等待的超时时间到了才唤醒。
5. recv 线程
该线程具体分析可见: SRS之接收推流线程:recv
原文地址:https://www.cnblogs.com/jimodetiantang/p/9130634.html