yate学习--关于yrtpchan模块

转载请声明出处:http://blog.csdn.net/u012377333

本文简介:yrtpchan是一个关于yate自定义的处理rtp协议的模块(关于rtp协议的知识,大家可以网上百度、谷歌)。本文主要是对于yrtpchan处理rtp.chan消息做了简要的分析和理解。

对于yate的每个模块如何去分析和查看(因为没有main函数和其他的函数调用该模块的函数,所以一般常规的阅读代码的方式不是很好),首先是从继承Module类去查看。关于yrtpchan模块,就是从这里开始查看了:

class YRTPPlugin : public Module
{
public:
    YRTPPlugin();
    virtual ~YRTPPlugin();
    virtual void initialize();
    virtual bool received(Message& msg, int id);
    virtual void statusParams(String& str);
    virtual void statusDetail(String& str);
    virtual void genUpdate(Message& msg);

private:
    bool reflectSetup(Message& msg, const char* id, RTPTransport& rtp, const char* rHost, const char* leg);
    bool reflectStart(Message& msg, const char* id, RTPTransport& rtp, SocketAddr& rAddr);
    void reflectDrop(YRTPReflector*& refl, Lock& mylock);
    void reflectExecute(Message& msg);
    void reflectAnswer(Message& msg, bool ignore);
    void reflectHangup(Message& msg);
    bool m_first;
};

还定义了一个静态的全局变量

static YRTPPlugin splugin;

在加载模块的时候会调用

<pre name="code" class="cpp">void YRTPPlugin::initialize()
{
    Output("Initializing module YRTP");
    Configuration cfg(Engine::configFile("yrtpchan"));
    s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) &&
	cfg.getBoolValue("general","ipv6_support",false);
    s_minport = cfg.getIntValue("general","minport",MIN_PORT);
    s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
    s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
    s_minJitter = cfg.getIntValue("general","minjitter",50);
    s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
    s_tos = cfg.getIntValue("general","tos",Socket::tosValues());
    s_udpbuf = cfg.getIntValue("general","udpbuf",0);
    s_localip = cfg.getValue("general","localip");
    s_autoaddr = cfg.getBoolValue("general","autoaddr",true);
    s_anyssrc = cfg.getBoolValue("general","anyssrc",true);
    s_padding = cfg.getIntValue("general","padding",0);
    s_rtcp = cfg.getBoolValue("general","rtcp",true);
    s_interval = cfg.getIntValue("general","rtcp_interval",4500);
    s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode());
    s_monitor = cfg.getBoolValue("general","monitoring",false);
    s_sleep = cfg.getIntValue("general","defsleep",5);
    RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
    s_priority = Thread::priority(cfg.getValue("general","thread"));
    s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
    s_timeout = cfg.getIntValue("timeouts","timeout",3000);
    s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);
    s_notifyMsg = cfg.getValue("timeouts","notifymsg");
    s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true);
    s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);
    setup();
    if (m_first) {
	m_first = false;
	installRelay(Execute,50);
	installRelay(Ringing,50);
	installRelay(Progress,50);
	installRelay(Answered,50);
	installRelay(Private,"chan.hangup",50);
	Engine::install(new AttachHandler);
	Engine::install(new RtpHandler);
	Engine::install(new DTMFHandler);
    }
}

读取配置文件里面的参数,如果没有配置文件的话,使用的都是默认参数

    Configuration cfg(Engine::configFile("yrtpchan"));
    s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) &&
	cfg.getBoolValue("general","ipv6_support",false);
    s_minport = cfg.getIntValue("general","minport",MIN_PORT);
    s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
    s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
    s_minJitter = cfg.getIntValue("general","minjitter",50);
    s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
    s_tos = cfg.getIntValue("general","tos",Socket::tosValues());
    s_udpbuf = cfg.getIntValue("general","udpbuf",0);
    s_localip = cfg.getValue("general","localip");
    s_autoaddr = cfg.getBoolValue("general","autoaddr",true);
    s_anyssrc = cfg.getBoolValue("general","anyssrc",true);
    s_padding = cfg.getIntValue("general","padding",0);
    s_rtcp = cfg.getBoolValue("general","rtcp",true);
    s_interval = cfg.getIntValue("general","rtcp_interval",4500);
    s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode());
    s_monitor = cfg.getBoolValue("general","monitoring",false);
    s_sleep = cfg.getIntValue("general","defsleep",5);
    RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
    s_priority = Thread::priority(cfg.getValue("general","thread"));
    s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
    s_timeout = cfg.getIntValue("timeouts","timeout",3000);
    s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);
    s_notifyMsg = cfg.getValue("timeouts","notifymsg");
    s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true);
    s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);

声明该模块可以处理那些消息

installRelay(Execute,50);
	installRelay(Ringing,50);
	installRelay(Progress,50);
	installRelay(Answered,50);
	installRelay(Private,"chan.hangup",50);
	Engine::install(new AttachHandler);
	Engine::install(new RtpHandler);
	Engine::install(new DTMFHandler);

这里我们重点查看处理RTP消息的类

class RtpHandler : public MessageHandler
{
public:
    RtpHandler() : MessageHandler("chan.rtp",100,splugin.name()) { }
    virtual bool received(Message &msg);
};

每当有模块发布消息"chan.rtp"的时候,都会进入received函数

bool RtpHandler::received(Message &msg)
{
    bool udptl = false;
    const String& trans = msg[YSTRING("transport")];
    if (trans && !trans.startsWith("RTP/")) {
	if (trans &= "udptl")
	    udptl = true;
	else
	    return false;
    }
    Debug(&splugin,DebugAll,"%s message received",(trans ? trans.c_str() : "No-transport"));
    bool terminate = msg.getBoolValue(YSTRING("terminate"),false);
    const String& dir = msg[YSTRING("direction")];
    RTPSession::Direction direction = terminate ? RTPSession::FullStop : RTPSession::SendRecv;
    bool d_recv = false;
    bool d_send = false;
    if (dir == YSTRING("bidir")) {
	d_recv = true;
	d_send = true;
    }
    else if (dir == YSTRING("receive")) {
	d_recv = true;
	direction = RTPSession::RecvOnly;
    }
    else if (dir == YSTRING("send")) {
	d_send = true;
	direction = RTPSession::SendOnly;
    }

    CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
    DataEndpoint* de = YOBJECT(DataEndpoint,msg.userData());
    const char* media = udptl ? "image" : "audio";
    media = msg.getValue(YSTRING("media"),(de ? de->name().c_str() : media));
    RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
    if (w)
	Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint %p",(YRTPWrapper*)w,ch);
    else {
	const String& rid = msg[YSTRING("rtpid")];
	w = YRTPWrapper::find(rid);
	if (w)
	    Debug(&splugin,DebugAll,"Wrapper %p found by ID '%s'",(YRTPWrapper*)w,rid.c_str());
    }
    if (w)
	w->deref();
    if (terminate) {
	if (w) {
	    if (w->host())
		msg.setParam("localip",w->host());
	    if (w->port())
		msg.setParam("localport",String(w->port()));
	    w->terminate(msg);
	    msg.setParam("status","terminated");
	    return true;
	}
	return false;
    }
    if (!(ch || de || w)) {
	Debug(&splugin,DebugWarn,"Neither call channel nor RTP wrapper found!");
	return false;
    }

    const String& rip = msg[YSTRING("remoteip")];
    const char* status = "updated";

    if (!w) {
	// it would be pointless to create an unreferenced wrapper
	if (!(d_recv || d_send))
	    return false;
	String lip(msg.getValue(YSTRING("localip")));
	bool ipv6 = msg.getBoolValue(YSTRING("ipv6_support"),s_ipv6);
	if (lip.null())
	    YRTPWrapper::guessLocal(rip,lip,ipv6);
	if (lip.null()) {
	    Debug(&splugin,DebugWarn,"RTP request with no local address!");
	    return false;
	}

	status = "created";
	w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);
	w->setMaster(msg.getValue(YSTRING("id")));

	w->deref();
    }
    else if (w->valid())
	w->addDirection(direction);
    else
	return false;

    if (d_recv) {
	if (ch && !ch->getSource(media)) {
	    DataSource* s = w->getSource();
	    ch->setSource(s,media);
	    s->deref();
	}
	else if (de && !de->getSource()) {
	    DataSource* s = w->getSource();
	    de->setSource(s);
	    s->deref();
	}
    }

    if (d_send) {
	if (ch && !ch->getConsumer(media)) {
	    DataConsumer* c = w->getConsumer();
	    ch->setConsumer(c,media);
	    c->deref();
	}
	else if (de && !de->getConsumer()) {
	    DataConsumer* c = w->getConsumer();
	    de->setConsumer(c);
	    c->deref();
	}
    }

    if (w->refcount() <= 1)
	return false;

    w->setParams(rip,msg);
    w->setFaxDivert(msg);
    msg.setParam("localip",w->host());
    msg.setParam("localport",String(w->port()));
    msg.setParam("rtpid",w->id());
    msg.setParam("status",status);

    if (msg.getBoolValue(YSTRING("getsession"),!msg.userData()))
	msg.userData(w);
    return true;
}

这个里面的内容比较多,我感觉可以分成两个部分来查看:

一个是

RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
<pre name="code" class="cpp">w = YRTPWrapper::find(rid);

另外一个

w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);

不知道大家有没有注意过,yate对于转发两个yate client的rtp包的时候,是创建了两个ip、port去对应两个客户端的接受和发送。这里举一个比较简单的例子:

S是服务器,A和C是客户端,假如是A呼叫C,从S上面抓包(rtp)的格式大致是这样的:

a.ip:port-------------------------------------------->s.ip2:port2

s.ip1:port1----------------------------------------->c.ip:port

s.ip2:port2----------------------------------------->a.ip:port

c.ip:port--------------------------------------------->s.ip1:port

思考:

也就是说服务器创建了两个rtp的套接字去发送和接受两个客户端的媒体数据,为什么会这样?为什么不是一个套接字来处理A到C的媒体数据另外一个套接字来处理C到A的媒体数据?

未完,待续......

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-25 03:19:00

yate学习--关于yrtpchan模块的相关文章

Python学习之cookielib模块

cookielib是一个用于处理客户端HTTP cookie的模块 https://docs.python.org/2/library/cookielib.html?highlight=cookielib#cookielib In [191]: import cookielib,urllib2 In [192]: cj=cookielib.CookieJar() In [193]: openner=urllib2.build_opener(urllib2.HTTPCookieProcessor(

Nginx学习指南之模块的应用

一.自述 Nginx模块功能也是相当的丰富,但对于Apache Web来说,两者之间还是有区别的,大家都知道Nginx模块是直接被编译进了nginx.conf配置文件,而Apache则是被编译成为.SO文件,有些是需要在httpd.conf配置文件中指定是否加载,这样才能激活模块功能.Nginx模块有三个角色,Handlers(处理一个HTTP请求,并产生输出):Filters(处理由一个handler生成的输出):load-balancers(负载均衡器,当后端服务器符合一个以上后,依据算法选

python基础学习日志day5-各模块文章导航

python基础学习日志day5---模块使用 http://www.cnblogs.com/lixiang1013/p/6832475.html python基础学习日志day5---time和datetime模块 http://www.cnblogs.com/lixiang1013/p/6848245.html python基础学习日志day5---random模块http://www.cnblogs.com/lixiang1013/p/6849162.html python基础学习日志da

学习python argparse模块下载安装和使用

Python中argparse模块已经替代以前的optparse模块,常被用来实现命令行参数和选项的解析作用. 位置参数: 从一个最基本的程序开始(它并没有实现什么现实的功能): import argparseparser = argparse.ArgumentParser()parser.parse_args() 下面是运行之后的结果: $ python prog.py$ python prog.py --helpusage: prog.py [-h]optional arguments: 

菜菜鸟Zend Framework 2 不完全学习涂鸦(四)-- 模块

菜菜鸟Zend Framework 2 不完全学习涂鸦(四)-- 模块 这是涂鸦的第四篇 模块(Modules) ZF2 是一个模块系统,而你需要在每个模块中组织你主要应用代码.由模板(skeleton)所提供的应用程序模块在整个应用程序中被用作引导(bootstrapping),错误(error)和路由设置(routing configuration).它经常被用作提供应用级别控制,例如,应用程序的首页.但是在这个教程中我们不使用默认的模块,我们将使用唱片列表来作为应用程序的首页. 我们将代码

yate学习--./yate/packingyate.logrotate

上一篇文章记录了yate学习--./yate/tools/log_rotate.sh: 今天继续看和日志部分的脚本./yate/packingyate.logrotate,先看看脚本源码. # Have to rotate the log and CDR files before each reaches 2GB in size /var/log/yate /var/log/yate-cdr.tsv { size=100M rotate 5 missingok sharedscripts not

【python标准库学习】re模块

1.什么是re 正则表达式一门相对通用的语言,在python中也有对正则表达式的支持,那就是的内置re模块.正则表达式就是一系列的规则去匹配字符串然后进行相应的操作,这些规则网上一搜一大片,而re则是运用正则表达式来提供一系列的功能强大的接口让我们来调用.通常我们在对日志文件进行操作的时候会对正则表达式运用的比较多来得到我们希望得到的数据. 2.python中的转义符 正则表达式中通常用反斜杠'\'来代表转义,'\d'代表数字等,但是python本身也是通过反斜杠'\'来表示转义,所以就和正则表

puppet学习之四 编写模块

puppet学习之四 编写模块 这里不赘述编写的语法,只是简单记录下一个模块编写的简单流程 [[email protected] ~]# cd /etc/puppet/modules/ [[email protected] modules]# mkdir puppet [[email protected] modules]# cd puppet/ [[email protected] puppet]# mkdir files manifests templates #创建模块目录结构 [[ema

第39天python学习内置模块time模块

#自带模块学习time #time模块:1.时间戳;意义是方便作做运算: 2.结构化时间 :3.字符串时间import time# #时间戳# print(time.time())#表示1970年1月1日0点0时0分到现在用了多少秒,1970年是liunx的诞生时间# #结构化时间————当地时间# print(time.localtime())#获得的结果:time.struct_time(tm_year=2019, tm_mon=10, tm_mday=3, tm_hour=17, tm_m