LIVE555研究之五:RTPServer(二)

接上文,main函数的几行代码创建了RTSPServer类的子类DynamicRTSPServer对象。RTPServer类是服务器类的基类,DynamicRTSPServer代表具体的服务器子类。我们今天介绍的服务器程序就是基于该类实现的。

在创建DynamicRTSPServer时传入了值为554的端口号。这是因为RTSP默认端口号为554,与http默认使用80端口是一样的。

DynamicRTSPServer

继承关系:

Medium是很多类的基类。内部定义了指向环境类的引用和一个char类型媒体名称。并定义了按照媒体名称,查找对应媒体的成员函数lookupByName。由于MediaSink、MediaSouce、MediaSession、RTSPClient、RTPServer均继承自该类,因此在Medium中定义了很多判断该类是哪个媒体类型的函数:

  virtual Boolean isSource() const;

  virtual Boolean isSink() const;

  virtual Boolean isRTCPInstance() const;

  virtual Boolean isRTSPClient() const;

  virtual Boolean isRTSPServer() const;

  virtual Boolean isMediaSession() const;

  virtual Boolean isServerMediaSession() const;

  virtual Boolean isDarwinInjector() const;

Medium中的实现均是返回false。在对应的子类中均会重定义对应函数,并返回true。

TaskToken fNextTask用来保存延迟任务的ID。保存的任务ID用于被重新调度,或者在该媒体对象被销毁时从延迟队列中取消调度。

RTPServer类是服务器类的基类,代表了服务器对象。在整个服务器运行期间,该对象一直存在。

定义了以下成员变量:

  HashTable* fServerMediaSessions; 

  HashTable* fClientConnections; 

  HashTable* fClientConnectionsForHTTPTunneling;   

  HashTable* fClientSessions; 

  HashTable* fPendingRegisterRequests;

从其成员变量可以看到RTPServer中维护了ServerMediaSession对象、ClientConnection、ClientSession对象的HashTable。

ServerMediaSessionSession对应服务器端一个媒体文件,当客户端请求多个媒体文件时,RTPServer内会维护对应的多个ServerMediaSession对象。ServerMediaSession对象通过媒体文件名进行标识,如客户端请求a.264文件,则服务器就会在保存ServerMediaSession的HashTable中搜索对应文件名为a.264的ServerMediaSession。如未找到,则说明还未为该媒体文件创建对应的ServerMediaSession。并创建一个新的ServerMediaSession与媒体文件名关联后添加到HashTable。

lookupServerMediaSession用于在map中搜索对应媒体文件名对应的ServerMediaSession。

void addServerMediaSession(ServerMediaSession* serverMediaSession);

  virtual ServerMediaSession* lookupServerMediaSession(char const* streamName);

  void removeServerMediaSession(ServerMediaSession* serverMediaSession);

  void removeServerMediaSession(char const* streamName);

以上三个成员函数分别用来添加、查询和删除对应ServerMediaSession项。

removeServerMediaSession被调用后,在RTPServer中维护的fServerMediaSession的HashTable中,该ServerMediaSession会被删除。但是对应的ServerMediaSession对象并不一定会被释放。因为此时其他客户端还有可能在使用该媒体文件。只有当其他客户端都释放了对该媒体文件的引用后,该对象才会被释放。

closeAllClientSessionsForServerMediaSession用于删除所有客户端对某一个媒体文件的引用。

deleteServerMediaSession在从fServerMediaSession中删除对应项目时同时也会删除所有客户端的引用,此后该对象的引用计数为0可以被安全释放。

在removeServerMediaSession时会检查引用计数,只有当引用计数为0时该对象才会被释放。

if (serverMediaSession->referenceCount() == 0) //只有当引入计数为0时才会被释放
{
    Medium::close(serverMediaSession);
}
else
{
  serverMediaSession->deleteWhenUnreferenced() = True;
}

ClientConnection对象

ClientConnection对象定义在RTPServer内部,为其内部类。主要用于和客户端的通信。当有新的客户端连接到服务器时,会新建ClientConnection对象。其内部定义了发送、接收socket以及发送和接收缓冲区,并对客户端的命令进行处理和回应。

void handleRequestBytes(int newBytesRead);

用于处理客户端命令,在对RTSP命令进行分析后,提取出各种信息,然后进行分流处理。

对于OPTIONS、DESCRIBE、命令不支持、命令有误等其他错误命令的响应会直接在ClientConnection中进行处理。     而对于SETUP、PLAY、PAUSE、TERARDOWN等命令会传递到ClientSession中进行处理。

以下为分流代码:

 else if (strcmp(cmdName, "TEARDOWN") == 0
               || strcmp(cmdName, "PLAY") == 0
               || strcmp(cmdName, "PAUSE") == 0
               || strcmp(cmdName, "GET_PARAMETER") == 0
               || strcmp(cmdName, "SET_PARAMETER") == 0)
         {
              if (clientSession != NULL) {

                clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);

              } else
              {
                handleCmd_sessionNotFound();
              }

ClientSession对象会在客户端请求SETUP命令时在ClientConnection中创建,并分配一个ClientSessionID。对于SETUP之前和对一些出错处理命令会在ClientConnection中进行响应。

ClientConnection维护了RTPServer的指针,可以在新建ClientSession对象后将其加入到RTPServer维护的fClientSessions中。

ClientSession中定义的成员:

RTSPServer& fOurServer;

 u_int32_t fOurSessionId;

ServerMediaSession* fOurServerMediaSession;

 

ClientSession也维护了对RTPServer的引用。同时也保存了指向ServerMediaSession的指针。在对SETUP的响应中,有这样一句话:

  if (fOurServerMediaSession == NULL)
{
       // We‘re accessing the "ServerMediaSession" for the first time.
       fOurServerMediaSession = sms;
       fOurServerMediaSession->incrementReferenceCount();
 }
else if (sms != fOurServerMediaSession)
{
       // The client asked for a stream that‘s different from the one originally requested for this stream id.  Bad request:
       ourClientConnection->handleCmd_bad();
       break;
}

由此我们知道按照目前的实现,每个clientSession只能对应一个ServerMediaSession。即每个客户端只能请求一个媒体文件,不能同时请求两个媒体文件。如果需要同时支持多个媒体文件,就需要在ClientSession中维护一个ServerMediaSession集合。

ClientSession的noteLiveness用于客户端保活。其内部实现如下:
void RTSPServer::RTSPClientSession::noteLiveness()

{
   if (fOurServer.fReclamationTestSeconds > 0)
    {
     envir().taskScheduler()
       .rescheduleDelayedTask(fLivenessCheckTask,
                          fOurServer.fReclamationTestSeconds*1000000,
                          (TaskFunc*)livenessTimeoutTask, this);
    }
}

上述代码向调度器请求重新调度一个延迟任务,在fReclamationTestSeconds后会调用livenessTimeoutTask。其实现很简单仅仅删除自身。

void RTSPServer::RTSPClientSession
   ::livenessTimeoutTask(RTSPClientSession* clientSession)
{
   delete clientSession;
}

当服务器收到对应客户端的RR包时会调用noteLiveness,重新计时。

fReclamationTestSeconds在RTPServer构造时传入,默认为65s。表示如65s内未收到客户端RTCP包即认为客户端已断开。

如果在fReclamationTestSeconds的时间内再次调用noteLiveness,则该延迟任务会被设置成新的时间,原来的调度不再起作用。

  struct streamState
  {

     ServerMediaSubsession* subsession;
      void* streamToken;
  } * fStreamStates;

fStreamStates指向一个动态分配的数组。fNumStreamStates表示该数组包含的元素个数。

ServerMediaSession代表一个track(媒体流)。streamToken是void*类型的指针,但它指向StreamState类的对象。StreamState对象代表一个真正流动起来的数据流。这个流从XXXXFileSouce流向RTPSink。

可以看到一个ServerMediaSubSession对应一个StreamState。但ServerMediaSubSession对应一个静态的流,可以被多个客户端重用。如:多个客户端可能会请求同一个媒体文件中的track。StreamState代表一个动态的流。

ServerMediaSession

ServerMediaSession代表服务器端一个媒体文件。

其成员如下:

ServerMediaSubsession* fSubsessionsHead;

  ServerMediaSubsession* fSubsessionsTail;

  unsigned fSubsessionCounter;
  char* fStreamName;
  char* fInfoSDPString;
  char* fDescriptionSDPString;
  char* fMiscSDPLines;
  struct timeval fCreationTime;
  unsigned fReferenceCount;
  Boolean fDeleteWhenUnreferenced;

可以看到其主要成员为fSubsessionsHead、fSubsessionsTail。代表该媒体文件中的多个媒体流track。fStreamName为该媒体文件名。fDescritionSDPString代表SDP字符串。用于在客户端发送DESCRIBE命令时返回给客户端。

fReferenceCount为引用计数。当将fDeleteWhenUnreferenced设置为true,且引用计数为0时,ServerMediaSession会被释放。该值在构造函数中默认赋值为false。即所有ServerMediaSession即使不存在被客户端引用时,也不会被释放。对于长时间运行的服务器程序将会出现内存消耗耗尽的情况。解决方案就是在构造时将fDeleteWhenUnreferenced的默认值赋值为true。

其他成员函数是用来操纵MediaSubSession。

MediaSubSession

如果一个媒体文件中既包含音频流又包含视频流,我们称这个媒体文件中包含两个track。每个track对应一个ServerMediaSubsession。

ServerMediaSession* fParentSession;
 netAddressBits fServerAddressForSDP;
 portNumBits fPortNumForSDP;
private:
  ServerMediaSubsession* fNext;
  unsigned fTrackNumber; // within an enclosing ServerMediaSession
  char const* fTrackId;

fParentSession指向该MediaSubSession所属的ServerMediaSession。

fNext指向下一个同属于一个ServerMediaSession的ServerMediaSubsession。如果只包含一个媒体流,则fNext指针为NULL。

fTrackNumber为track号。在客户端发送DESCRIBE命令时,服务器端会为每个媒体流分配一个TrackID。

fTrackId 为字符串指针,该字符串由”track”和fTrackNumber拼接而成。如track1、track2。

ServerMediaSubsession中仅仅定义了空的接口,具体实现均放在其子类。

OnDemandServerMediaSubsession

HashTable* fDestinationsHashTable; 存储sessionID和Destinations的映射。

Destinations为目的地址。每个ClientSession在HashTable中都有与自己对应的项。

Destinations可以维护一对RTP和RTCP的端口和地址信息。

StreamState

前面说过StreamState代表一个真正流动的流,现在让我们看下StreamState的究竟实现了什么功能。

  OnDemandServerMediaSubsession& fMaster;
  Boolean fAreCurrentlyPlaying;
  unsigned fReferenceCount;
  Port fServerRTPPort, fServerRTCPPort;
  RTPSink* fRTPSink;
  BasicUDPSink* fUDPSink;
  float fStreamDuration;
  unsigned fTotalBW;
  RTCPInstance* fRTCPInstance;
  FramedSource* fMediaSource;
  float fStartNPT;
  Groupsock* fRTPgs;
  Groupsock* fRTCPgs;

fMaster为对OnDemandServerMediaSubsession或其子类的引用。

fReferenceCount为引用计数。

fServerRTPPort为RTP端口

fServerRTCPPort为RTCP端口

fRTPSink抽象Sink类。

fMediaSource为Souce基类。

可以看到StreamState既维护了Sink,又维护了Souce。其实在StreamState

GroupSock主要用于处理组播,但也可以处理单播。

Groupsock* fRTPgs和   Groupsock* fRTCPgs为RTP和RTCP的地址,用于向RTP和RTCP端口发送数据。

RTCPInstance

RTCPInsance是对RTCP通信的封装。RTCP的功能是统计包的收发,为流量统计提供依据。由于其封装的比较完整,因此RTCPInstance与其他类间的关系不是那么紧密。

RTCPInstance靠RTPInterface提供支持,所以它既支持RTP over UDP,又支持RTP over TCP。

  void setByeHandler(TaskFunc* handlerTask, void* clientData,
            Boolean handleActiveParticipantsOnly = True);
  void setSRHandler(TaskFunc* handlerTask, void* clientData);
  void setRRHandler(TaskFunc* handlerTask, void* clientData);
  void setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
               TaskFunc* handlerTask, void* clientData);

以上四个成员函数均是用来设置回调函数。在满足一定条件时该回调被调用。

setByeHandler用于设置在客户端结束与服务器的RTCP通信时的回调。

setSRHandler用于设置在收到客户端的SR包时的回调。在收到SR包时该回调被调用。

setRRHandler用于设置在收到客户端的RR包时的回调。在收到RR包时该回调被调用。

setSpecificRRHandler该成员函数与SetRRHandler的区别在于,它可以设置针对某一客户端的RR包的回调。RTPClientSession就是调用此回调,为指定客户端注册noteClientLiveness。用于检测客户端保活。如在一定时间内收不到RR包时即认为客户端已经断开了连接。此时将会删除对应的clientSession对象。这里提供了一种监视客户端运行状态的好方法。

每个MediaSubSession对应一个StreamState对象。它们被保存在在ServerMediaClient中被StreamState数组中。在收到客户端的PLAYM命令后,ServerMediaClient的响应函数内会为每个StreamState调用play:

// Now, start streaming:

  for (i = 0; i < fNumStreamStates; ++i)

 {
    if (subsession == NULL /* means: aggregated operation */
       || subsession == fStreamStates[i].subsession)
     {
      unsigned short rtpSeqNum = 0;
      unsigned rtpTimestamp = 0;
      if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession->startStream(fOurSessionId,
                         fStreamStates[i].streamToken,
                         (TaskFunc*)noteClientLiveness, this,
                         rtpSeqNum, rtpTimestamp,                         

      //略去部分代码

     }
 }

RTSPClientSession的handleCmd_SETUP中会根据ServerMediaSubSession的个数创建streamStates数组。

if (fStreamStates == NULL)
{
      // 计算ServerMediaSubSession个数
       ServerMediaSubsessionIterator iter(*fOurServerMediaSession);
      for (fNumStreamStates = 0; iter.next() != NULL; ++fNumStreamStates) {}
      fStreamStates = new struct streamState[fNumStreamStates];
      iter.reset();
      ServerMediaSubsession* subsession;
      //将ServerMediaSubSession与streamStates通过fStreamStates数组进行关联
       for (unsigned i = 0; i < fNumStreamStates; ++i)
     {
        subsession = iter.next();
        fStreamStates[i].subsession = subsession;
        fStreamStates[i].streamToken = NULL;
     }
}

上述代码中与ServerMediaSubSession 关联的streamToken被赋值为NULL。并会在后面的getStreamParameters中被赋值,最后一个参数为指针的引用,用于在getStreamParameters中修改该指针。

subsession->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,
                  clientRTPPort, clientRTCPPort,
                  tcpSocketNum, rtpChannelId, rtcpChannelId,
                  destinationAddress, destinationTTL, fIsMulticast,
                  serverRTPPort, serverRTCPPort,
                  fStreamStates[streamNum].streamToken);

getStreamParameters在OnDemandServerMediaSubsession重新定义,可以看到创建StreamStates对象的代码:

// Set up the state of the stream.  The stream will get started later:
    streamToken = fLastStreamToken
      = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
           streamBitrate, mediaSource,
           rtpGroupsock, rtcpGroupsock);

可以看到StreamStates关联了Sink和Souce。之所以要在OnDemandServerMediaSubsession重新定义的getStreamParameters中分配StreamStates对象,是因为它定义了新的创建具体MediaSouce和MediaSink的虚函数。

virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
                       unsigned& estBitrate) = 0;
     // "estBitrate" is the stream‘s estimated bitrate, in kbps
   virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
                  unsigned char rtpPayloadTypeIfDynamic,
                  FramedSource* inputSource) = 0;
 

StreamStates关联的MediaSouce和MediaSink均是具体的子类。若媒体文件为H264码流,则对应的Souce为H264VideoStreamFramer,对应的Sink为H264VideoRTPSink。

在RTSPClientSession的handleCmd_PLAY中为每个MediaSubSession循环调用startStream,并传入与MediaSubSession关联的StramStates对象指针:

for (i = 0; i < fNumStreamStates; ++i)
{
    if (subsession == NULL /* means: aggregated operation */
      || subsession == fStreamStates[i].subsession)
   {
      unsigned short rtpSeqNum = 0;
      unsigned rtpTimestamp = 0;
      if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession->startStream(fOurSessionId,
                         fStreamStates[i].streamToken,
                        (TaskFunc*)noteClientLiveness, this,
                         rtpSeqNum, rtpTimestamp,
                         RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
     }

}

startStram内部调用了StreamStates的startPlaying:

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                     void* streamToken,
                     TaskFunc* rtcpRRHandler,
                     void* rtcpRRHandlerClientData,
                     unsigned short& rtpSeqNum,
                     unsigned& rtpTimestamp,
                     ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
                      void* serverRequestAlternativeByteHandlerClientData) {
  StreamState* streamState = (StreamState*)streamToken;
  Destinations* destinations
    = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
  if (streamState != NULL)
  {
    streamState->startPlaying(destinations,
                 rtcpRRHandler, rtcpRRHandlerClientData,
                    serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
    RTPSink* rtpSink = streamState->rtpSink(); // alias
    if (rtpSink != NULL)
    {
      rtpSeqNum = rtpSink->currentSeqNo();

      rtpTimestamp = rtpSink->presetNextTimestamp();
    }
  }
}

streamStates的startPlaying内部则创建了RTCPInstance对象并调用了RTPSink的startPlaying函数:

fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

第一个参数即为具体的MediaSouce子类。StartPlaying之后,Sink会调用Souce的getNextFrame获得一帧数据。

上面介绍的各种类是支撑LIVE555的各种基础设施,对于各种码流都是通用的。

2014.8.28于浙江杭州

时间: 2025-01-13 23:38:11

LIVE555研究之五:RTPServer(二)的相关文章

LIVE555研究之三LIVE555基础

LIVE555基础 LIVE555是为流媒体提供解决方案的跨平台C++开源项目.从今天起我们将正式开始深入LIVE555代码. 一.各库简要介绍 LIVE555下包含LiveMedia.UsageEnvironment.BasicUsageEnvironment.GroupSock库,MediaServer简单服务器程序以及其他多个测试demo.     LiveMedia库:包含一系列处理不同编码格式和封装格式的类,基类是Medium.     UsageEnvironment库:环境类,用于

闲话缓存:ZFS 读缓存深入研究-ARC(二)

Solaris ZFS ARC的改动(相对于IBM ARC) 如我前面所说,ZFS实现的ARC和IBM提出的ARC淘汰算法并不是完全一致的.在某些方面,它做了一些扩展: ·         ZFS ARC是一个缓存容量可变的缓存算法,它的容量可以根据系统可用内存的状态进行调整.当系统内存比较充裕的时候,它的容量可以自动增加.当系统内存比较紧张(其它事情需要内存)的时候,它的容量可以自动减少. ·         ZFS ARC可以同时支持多种块大小.原始的实现假设所有的块都是相同大小的. ·  

LIVE555研究之三:LIVE555基础

LIVE555基础 LIVE555是为流媒体提供解决方式的跨平台C++开源项目.从今天起我们将正式開始深入LIVE555代码. 一.各库简要介绍 LIVE555下包括LiveMedia.UsageEnvironment.BasicUsageEnvironment.GroupSock库,MediaServer简单server程序以及其它多个測试demo.     LiveMedia库:包括一系列处理不同编码格式和封装格式的类.基类是Medium.     UsageEnvironment库:环境类

Live555研究之一 源代码编译

Live555 是一个为流媒体提供解决方案的跨平台的C++开源项目,它实现了对标准流媒体传输协议如RTP/RTCP.RTSP.SIP等的支持.Live555实现了对多种音视频编码格式的音视频数据的流化.接收和处理等支持,包括MPEG.H.263+.DV.JPEG视频和多种音频编码.同时由于良好的设计,Live555非常容易扩展对其他格式的支持.目前,Live555已经被用于多款播放器的流媒体播放功能的实现,如VLC(VideoLan).MPlayer. 从今天开始我们将一起学习live555源码

【云快讯】之五十二《华为的公有云即将发布,名字还是个谜》

2015-07-22 张晓东 东方云洞察 点击上面的链接文字,可以快速关注"东方云洞察"公众号 前两天,笔者在微信中收到华为公有云战略与业务发布会的邀请函,打开看了一下发布会议程,放在了7月30日,还真的是7月最后一天啊,看来是希望的能准备的更充分些. 由华为的轮值CEO徐直军先生来进行发布,也足见华为对于这词发布的重视,看邀请的嘉宾也比较重量级,就是人数少了点. 华为的公有云其实在2010年就出现了,不过在市场推广上并没有下功夫,这和华为高层的态度有关系,毕竟华为很看重国内几大运营商

背包九讲之五(二维费用的背包问题)

http://acm.fafu.edu.cn/problem.php?id=1499 1 /* 2 二维费用的背包问题是指:对于每件物品,具有两种不同的费用, 3 选择这件物品就必须付出这两种代价,每种代价都有可付出的最大值(背包容量) 4 问怎么选择物品才能得到最大价值.费用增加了一维,那么只需要状态增加一维就可以了. 5 dp[i][j][k] 前i件物品付出两种代价为j和k的最大价值 6 dp[i][j][k] = max(dp[i-1][j][k],dp[i-1][j-a[i]][k-b

【管理心得之五十二】一次报复、一次转折

场景再现 ================ {甲}和{乙}是一起步入公司的同年生. 但是他们的工作的内容却不相同.{甲}是HR,{乙}则是业务员. 工作上他们之间很少有交集,但是私下里吐槽就却是常有的事. {乙}:我恨透这个公司了,感觉这里对我不公平,我要离开这里,找一个“伯乐”去. {甲}:好,你要离开这里我举双手赞成.既然公司对你这样的不公平,你就没想要报复公司吗? {乙}:有报复的机会当然好了. {甲}:不过你现在离开还不是机会,你只是“小卒”罢了,你的离职给不了公司多大损失. 我建议你,

(转载)VS2010/MFC编程入门之五十二(Ribbon界面开发:创建Ribbon样式的应用程序框架)

上一节中鸡啄米讲了GDI对象之画刷CBrush,至此图形图像的入门知识就讲完了.从本节开始鸡啄米将为大家带来Ribbon界面开发的有关内容.本文先来说说如何创建Ribbon样式的应用程序框架. Ribbon界面就是微软从Office2007开始引入的一种为了使应用程序的功能更加易于发现和使用.减少了点击鼠标的次数的新型界面,从实际效果来看,不仅外观漂亮,而且功能直观,用户操作简洁方便. 鸡啄米将以图文结合的方式来说明利用MFC向导创建Ribbon样式的单文档应用程序框架的过程,其实Ribbon应

scrapy研究探索(二)——爬w3school.com.cn

下午被一个问题困扰了好一阵.终于使用还有一种方式解决. 開始教程二.关于Scrapy安装.介绍等请移步至教程(一)(http://blog.csdn.net/u012150179/article/details/32343635). 在開始之前如果你已经安装成功一切所需,整怀着一腔热血想要抓取某站点. 一起来have a try. 1. 前期基础准备. Oh,不能在准备了,直接来. (1) 创建项目. 输入: scapy startproject w3school 以上创建项目w3school.