red5源码分析---6

red5源码分析—客户端和服务器的命令处理

在《red5源码分析—5》中可以知道,在RTMP握手完毕后,客户端会向服务器发送connect命令,connect命令的主要作用就是要和red5服务器上的某个Scope相连接,连接完成后,会向客户端发送带宽协调的指令,ping指令,和一个带宽检测指令。下面先分析ping指令。

ping指令

服务端代码

这里先贴一下在服务器将客户端和某个Scope相连后发出的ping指令代码,

    ...
    conn.ping(new Ping(Ping.STREAM_BEGIN, 0, -1));
    ...

Ping的构造函数很简单,就是一些基本的赋值,下面来看conn的ping函数,

    public void ping(Ping ping) {
        getChannel(2).write(ping);
    }

因此这里就是简单的发送给客户端了。

客户端代码

客户端接收到服务器发来的ping消息后,根据前面几章的分析,最后会调用到BaseRTMPClientHandler的onPing函数,下面来看,

    protected void onPing(RTMPConnection conn, Channel channel, Header source, Ping ping) {
        switch (ping.getEventType()) {
            case Ping.PING_CLIENT:
            case Ping.STREAM_BEGIN:
            case Ping.RECORDED_STREAM:
            case Ping.STREAM_PLAYBUFFER_CLEAR:
                Ping pong = new Ping();
                pong.setEventType(Ping.PONG_SERVER);
                pong.setValue2((int) (System.currentTimeMillis() & 0xffffffff));
                conn.ping(pong);
                break;
            case Ping.STREAM_DRY:
                break;
            case Ping.CLIENT_BUFFER:
                ...
                break;
            case Ping.PING_SWF_VERIFY:
                ...
                break;
            case Ping.BUFFER_EMPTY:
                break;
            case Ping.BUFFER_FULL:
                break;
            default:
        }
    }

因为服务器发来的ping命令的eventType是STREAM_BEGIN,因此只看前面一部分,客户端将自己当前的毫秒数发送给服务器。下面再来看服务器端的处理。

服务端代码

服务器收到客户端的PONG_SERVER消息后,最终会进入RTMPHandler的onPing函数,代码如下

    protected void onPing(RTMPConnection conn, Channel channel, Header source, Ping ping) {
        switch (ping.getEventType()) {
            case Ping.CLIENT_BUFFER:
                ...
                break;
            case Ping.PONG_SERVER:
                conn.pingReceived(ping);
                break;
            default:
        }
    }

这里直接调用RTMPMinaConnection的pingReceived函数,定义如下

    public void pingReceived(Ping pong) {
        long now = System.currentTimeMillis();
        Number previousPingValue = lastPingSentOn.get() & 0xffffffff;
        if (pong.getValue2() == previousPingValue) {
            lastPingRoundTripTime.set((int) ((now & 0xffffffff) - pong.getValue2().intValue()));
        } else {
            if (getPendingMessages() > 4) {
                Number pingRtt = (now & 0xffffffff) - pong.getValue2().intValue();
            }
        }
        lastPongReceivedOn.set(now);
    }

这里就是简单的赋值,记录一下本次ping的各个结果值。

长连接的处理

服务器端代码

既然说到了ping,这里就分析一下上一章中出现的KeepAliveTask,出现在startRoundTripMeasurement函数中,

    ...
    keepAliveTask = scheduler.scheduleAtFixedRate(new KeepAliveTask(), pingInterval);
    ...

该任务用于保持长连接,下面就来看,

        public void run() {
            if (state.getState() == RTMP.STATE_CONNECTED) {
                if (running.compareAndSet(false, true)) {
                    try {
                        if (isConnected()) {
                            long now = System.currentTimeMillis();
                            long currentReadBytes = getReadBytes();
                            long previousReadBytes = lastBytesRead.get();
                            if (currentReadBytes > previousReadBytes) {
                                if (lastBytesRead.compareAndSet(previousReadBytes, currentReadBytes)) {
                                    lastBytesReadTime = now;
                                }
                                if (isIdle()) {
                                    onInactive();
                                }
                            } else {
                                long lastPingTime = lastPingSentOn.get();
                                long lastPongTime = lastPongReceivedOn.get();
                                if (lastPongTime > 0 && (lastPingTime - lastPongTime > maxInactivity) && (now - lastBytesReadTime > maxInactivity)) {
                                    onInactive();
                                } else {
                                    ping();
                                }
                            }
                        } else {
                            onInactive();
                        }
                    } catch (Exception e) {
                    } finally {
                        running.compareAndSet(true, false);
                    }
                }
            }
        }

首先,如果在每次KeepAliveTask启动间隙有数据读入,就通过isIdle检查ping的状态,

    public boolean isIdle() {
        long lastPingTime = lastPingSentOn.get();
        long lastPongTime = lastPongReceivedOn.get();
        boolean idle = (lastPongTime > 0 && (lastPingTime - lastPongTime > maxInactivity));
        return idle;
    }

lastPingTime表示最近一次ping的时间,lastPongTime表示最后一次客户端相应ping的时间,它们的差就表示客户端最后一次响应ping的时间距离最近一次ping的时间有多长,当超过这个时间时,就表示服务器发送了很多ping请求但是客户端没响应,因此返回true,继而调用onInactive关闭连接。

回到KeepAliveTask中,假设服务器在KeepAliveTask启动间隙没有数据读入,就要判断是否很久没有响应ping请求了,并且已经过了很长时间没有读入数据了,“很久”的界限就是maxInactivity,如果满足,就关闭连接,如果不满足,就向客户端发送ping命令。这里不带参的ping函数如下,

    public void ping() {
        long newPingTime = System.currentTimeMillis();
        if (lastPingSentOn.get() == 0) {
            lastPongReceivedOn.set(newPingTime);
        }
        Ping pingRequest = new Ping();
        pingRequest.setEventType(Ping.PING_CLIENT);
        lastPingSentOn.set(newPingTime);
        int now = (int) (newPingTime & 0xffffffff);
        pingRequest.setValue2(now);
        ping(pingRequest);
    }

这里设置了前面提到的lastPingSentOn,主要是事件类型改变为了PING_CLIENT,但是客户端的处理方式和前面事件类型为STREAM_BEGIN时一样,所以往下就不分析了。

BandWidth命令

服务器端代码

当客户端发送connect命令连接服务器scope时,在连接过程中,服务器会发送两个和带宽相关的指令回给客户端,这段代码如下,

    two.write(new ServerBW(defaultServerBandwidth));
    two.write(new ClientBW(defaultClientBandwidth, (byte) limitType));

因此发送了ServerBW和ClientBW两个类,它们对应的dataType分别是TYPE_SERVER_BANDWIDTH和TYPE_CLIENT_BANDWIDTH。

客户端代码

根据上面说的dataType,当带宽的信息到达客户端后,会分别调用onServerBandwidth和onClientBandwidth进行处理,

    protected void onServerBandwidth(RTMPConnection conn, Channel channel, ServerBW message) {
        int bandwidth = message.getBandwidth();
        if (bandwidth != bytesReadWindow) {
            ClientBW clientBw = new ClientBW(bandwidth, (byte) 2);
            channel.write(clientBw);
        }
    }

    protected void onClientBandwidth(RTMPConnection conn, Channel channel, ClientBW message) {
        int bandwidth = message.getBandwidth();
        if (bandwidth != bytesWrittenWindow) {
            ServerBW serverBw = new ServerBW(bandwidth);
            channel.write(serverBw);
        }
    }

无论是发还是收的带宽,这里就是和本地的Window相比,如果不相等就发送给服务器。实际情况可以修改这部分代码以调整客户端带宽。

服务器端代码

服务器端接收到客户端反馈的信息后,也会调用本地的onServerBandwidth和onClientBandwidth函数继续处理,但这两个函数在服务器端是空函数,也即服务器什么也不做了。

checkBandWidth命令

服务器端代码

同样,在服务器端处理connect命令时,会发送checkBandWidth的命令,代码如下,

    public void checkBandwidth() {
        ServerClientDetection detection = new ServerClientDetection();
        detection.checkBandwidth(Red5.getConnectionLocal());
    }

ServerClientDetection的构造函数为空,因此直接看checkBandwidth函数,

    public void checkBandwidth(IConnection conn) {
        calculateClientBw(conn);
    }

    public void calculateClientBw(IConnection conn) {
        this.conn = conn;
        Random rnd = new Random();
        rnd.nextBytes(payload);
        rnd.nextBytes(payload1);
        startBytesWritten = conn.getWrittenBytes();
        startTime = System.nanoTime();
        callBWCheck("");
    }

payload和payload1是两个随机数组,startBytesWritten记录该连接发出的字节数,startTime是当前纳秒数,最关键的是最后调用的callBWCheck,

    private void callBWCheck(Object payload) {
        IConnection conn = Red5.getConnectionLocal();
        Map<String, Object> statsValues = new HashMap<String, Object>();
        statsValues.put("count", packetsReceived.get());
        statsValues.put("sent", packetsSent.get());
        statsValues.put("timePassed", timePassed);
        statsValues.put("latency", latency);
        statsValues.put("cumLatency", cumLatency);
        statsValues.put("payload", payload);
        if (conn instanceof IServiceCapableConnection) {
            packetsSent.incrementAndGet();
            ((IServiceCapableConnection) conn).invoke("onBWCheck", new Object[] { statsValues }, this);
        }
    }

这里进行相应的设置后就调用RTMPMinaConnection的invoke函数发送请求了,该函数定义如下,

    public void invoke(String method, Object[] params, IPendingServiceCallback callback) {
        IPendingServiceCall call = new PendingCall(method, params);
        if (callback != null) {
            call.registerCallback(callback);
        }
        invoke(call);
    }

因此,这里注册了回调函数,然后就向客户端发送该invoke请求了。

客户端代码

客户端对应的onBWCheck函数为空,处理函数只是原数据返回,所以这里不看。

服务器端代码

服务器端对应的代码也为空,因此也是留给开发人员去添加函数进行处理了。

时间: 2024-08-01 22:25:02

red5源码分析---6的相关文章

red5源码分析---10

red5源码分析-服务器处理publish命令 和前几章的分析一样,服务器接收到客户端发来的publish命令后,最终会执行RTMPHandler的onCommand函数,再参考<red5源码分析-8>的分析,最终会调用StreamService的publish方法,代码如下 public void publish(String name, String mode) { Map<String, String> params = null; if (name != null &

red5源码分析---12

red5源码分析-服务器处理视频数据 接着<red5源码分析-11>,本章假设客户端发来的是视频数据,下面就分析服务器如何处理这些数据的. 根据前面几章的分析,基于mina框架,数据到达服务器后,最终会到达RTMPHandler的messageReceived函数,messageReceived定义在RTMPHandler的父类BaseRTMPHandler中, public void messageReceived(RTMPConnection conn, Packet packet) th

red5源码分析---9

red5源码分析-客户端publish流 接着上一章的分析结果,参考<red5源码分析-7>的分析结论,当服务器返回steamId后,客户端会执行BaseRTMPClientHandler的onCommand函数,onCommand函数会根据返回的方法名"_result"开始执行handlePendingCallResult函数,handlePendingCallResult会获取之前注册的回调函数,根据<red5源码分析-7>,该回调函数就为CreateStr

red5源码分析---8

red5源码分析-服务器处理createStream命令 服务器接到createStream命令后,经过过滤器层层处理,最后会调用BaseRTMPHandler的messageReceived函数, public void messageReceived(RTMPConnection conn, Packet packet) throws Exception { if (conn != null) { IRTMPEvent message = null; try { message = pack

red5源码分析---7

red5源码分析-客户端处理connect命令并发送createStream命令 在<red5源码分析-5>中提到过,当客户端发送connect命令后,服务器经过处理会将其connect命令返回,不同的是服务器返回的结果包含了一些连接后需要发送给客户端的信息,包括服务器版本.模式等等.当返回的信息经过服务器的发送过滤器RTMPMinaProtocolEncoder时,会调用其中的RTMPProtocolEncoder的encodeCommand函数,下面来看其中的一段代码, protected

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线