red5源码分析---8

red5源码分析—服务器处理createStream命令

服务器接到createStream命令后,经过过滤器层层处理,最后会调用BaseRTMPHandler的messageReceived函数,

    public void messageReceived(RTMPConnection conn, Packet packet) throws Exception {
        if (conn != null) {
            IRTMPEvent message = null;
            try {
                message = packet.getMessage();
                final Header header = packet.getHeader();
                final Number streamId = header.getStreamId();
                final Channel channel = conn.getChannel(header.getChannelId());
                final IClientStream stream = conn.getStreamById(streamId);
                conn.setStreamId(streamId);
                conn.messageReceived();
                message.setSource(conn);
                final byte headerDataType = header.getDataType();
                switch (headerDataType) {
                    ...
                    case TYPE_INVOKE:
                    case TYPE_FLEX_MESSAGE:
                        onCommand(conn, channel, header, (Invoke) message);
                        IPendingServiceCall call = ((Invoke) message).getCall();
                        if (message.getHeader().getStreamId().intValue() != 0 && call.getServiceName() == null && StreamAction.PUBLISH.equals(call.getServiceMethodName())) {
                            if (stream != null) {
                                ((IEventDispatcher) stream).dispatchEvent(message);
                            }
                        }
                        break;
                    ...
                    default:
                }
                if (message instanceof Unknown) {

                }
            } catch (Throwable t) {

            }
            ...
        }
    }

createStream命令的消息属于TYPE_INVOKE消息,最后会调用onCommand函数,onCommand定义在RTMPHandler中,在《red5源码分析—5》中已经介绍了一部分OnCommand的代码,现在看另一部分,

    protected void onCommand(RTMPConnection conn, Channel channel, Header source, ICommand command) {
        final IServiceCall call = command.getCall();
        final String action = call.getServiceMethodName();
        if ("_result".equals(action) || "_error".equals(action)) {
            handlePendingCallResult(conn, (Invoke) command);
            return;
        }
        boolean disconnectOnReturn = false;
        boolean connected = conn.isConnected();
        if (connected) {
            if (call.getServiceName() == null) {
                StreamAction streamAction = StreamAction.getEnum(action);
                if (dispatchStreamActions) {
                    try {
                        conn.getScope().getHandler().handleEvent(new StreamActionEvent(streamAction));
                    } catch (Exception ex) {

                    }
                }
                switch (streamAction) {
                    case DISCONNECT:
                        conn.close();
                        break;
                    case CREATE_STREAM:
                    case INIT_STREAM:
                    case CLOSE_STREAM:
                    case RELEASE_STREAM:
                    case DELETE_STREAM:
                    case PUBLISH:
                    case PLAY:
                    case PLAY2:
                    case SEEK:
                    case PAUSE:
                    case PAUSE_RAW:
                    case RECEIVE_VIDEO:
                    case RECEIVE_AUDIO:
                        IStreamService streamService = (IStreamService) ScopeUtils.getScopeService(conn.getScope(), IStreamService.class, StreamService.class);
                        try {
                            if (invokeCall(conn, call, streamService)) {

                            } else {

                            }
                        } catch (Throwable err) {
                            ...
                        }
                        break;
                    default:
                        invokeCall(conn, call);
                }
            } else {
                invokeCall(conn, call);
            }
        } else {
             ...
        }
        if (command instanceof Invoke) {
            if ((source.getStreamId().intValue() != 0) && (call.getStatus() == Call.STATUS_SUCCESS_VOID || call.getStatus() == Call.STATUS_SUCCESS_NULL)) {
                return;
            }
            boolean sendResult = true;
            if (call instanceof IPendingServiceCall) {
                IPendingServiceCall psc = (IPendingServiceCall) call;
                Object result = psc.getResult();
                if (result instanceof DeferredResult) {
                    DeferredResult dr = (DeferredResult) result;
                    dr.setServiceCall(psc);
                    dr.setChannel(channel);
                    dr.setTransactionId(command.getTransactionId());
                    conn.registerDeferredResult(dr);
                    sendResult = false;
                }
            }
            if (sendResult) {
                Invoke reply = new Invoke();
                reply.setCall(call);
                reply.setTransactionId(command.getTransactionId());
                channel.write(reply);
                if (disconnectOnReturn) {
                    conn.close();
                }
            }
        }
    }

OnCommand的前后两部分代码都在《red5源码分析—5》中看过了,不同之处是这里已经与某个服务器的Scope连接了,因此isConnected返回true。再往下就是调用ScopeUtils的getScopeService获得StreamService,getScopeService的定义如下

    public static Object getScopeService(IScope scope, Class<?> intf, Class<?> defaultClass, boolean checkHandler) {

        String attr = IPersistable.TRANSIENT_PREFIX + SERVICE_CACHE_PREFIX + intf.getCanonicalName();
        if (scope.hasAttribute(attr)) {
            return scope.getAttribute(attr);
        }

        Object handler = null;
        if (checkHandler) {
            IScope current = scope;
            while (current != null) {
                IScopeHandler scopeHandler = current.getHandler();
                if (intf.isInstance(scopeHandler)) {
                    handler = scopeHandler;
                    break;
                }
                if (!current.hasParent()) {
                    break;
                }
                current = current.getParent();
            }
        }

        if (handler == null && IScopeService.class.isAssignableFrom(intf)) {
            Field key = null;
            Object serviceName = null;
            try {
                key = intf.getField("BEAN_NAME");
                serviceName = key.get(null);
                if (serviceName instanceof String) {
                    handler = getScopeService(scope, (String) serviceName, defaultClass);
                }
            } catch (Exception e) {

            }
        }
        if (handler == null && defaultClass != null) {
            try {
                handler = defaultClass.newInstance();
            } catch (Exception e) {

            }
        }
        scope.setAttribute(attr, handler);
        return handler;
    }

首先这里会尝试从Scope的缓存中获取已经实例化的StreamService,如果不行就从该Scope开始一直向其父类寻找其中的handler是不是(根据以前的章节可知道,这时这里基本获得的handler是CoreHandler),再往下就是查看BEAN_NAME,并尝试通过Spring获得或初始化该类,如果还不行,最后就直接通过Java实例化了。

回到onCommand中,假设获得了StreamService,下面就通过invokeCall开始处理了,

    private boolean invokeCall(RTMPConnection conn, IServiceCall call, Object service) {
        final IScope scope = conn.getScope();
        final IContext context = scope.getContext();
        return context.getServiceInvoker().invoke(call, service);
    }

根据前面几章的分析,这里获得的context为org.red5.server.Context,根据red5-default.xml的配置,getServiceInvoker返回org.red5.server.service.ServiceInvoker,因此下面来看它的invoke函数,

    public boolean invoke(IServiceCall call, Object service) {
        IConnection conn = Red5.getConnectionLocal();
        String methodName = call.getServiceMethodName();
        if (methodName.charAt(0) == ‘@‘) {
            methodName = methodName.substring(1);
        }
        Object[] args = call.getArguments();
        Object[] argsWithConnection;
        if (args != null) {
            argsWithConnection = new Object[args.length + 1];
            argsWithConnection[0] = conn;
            for (int i = 0; i < args.length; i++) {
                argsWithConnection[i + 1] = args[i];
            }
        } else {
            argsWithConnection = new Object[] { conn };
        }
        Object[] methodResult = null;
        methodResult = ReflectionUtils.findMethodWithExactParameters(service, methodName, argsWithConnection);
        if (methodResult.length == 0 || methodResult[0] == null) {
            methodResult = ReflectionUtils.findMethodWithExactParameters(service, methodName, args);
            if (methodResult.length == 0 || methodResult[0] == null) {
                methodResult = ReflectionUtils.findMethodWithListParameters(service, methodName, argsWithConnection);
                if (methodResult.length == 0 || methodResult[0] == null) {
                    methodResult = ReflectionUtils.findMethodWithListParameters(service, methodName, args);
                    if (methodResult.length == 0 || methodResult[0] == null) {
                        call.setStatus(Call.STATUS_METHOD_NOT_FOUND);
                        if (args != null && args.length > 0) {
                            call.setException(new MethodNotFoundException(methodName, args));
                        } else {
                            call.setException(new MethodNotFoundException(methodName));
                        }
                        return false;
                    }
                }
            }
        }
        Object result = null;
        Method method = (Method) methodResult[0];
        Object[] params = (Object[]) methodResult[1];
        try {
            if (method.isAnnotationPresent(DeclarePrivate.class)) {
                throw new NotAllowedException("Access denied, method is private");
            }
            final DeclareProtected annotation = method.getAnnotation(DeclareProtected.class);
            if (annotation != null) {
                if (!conn.getClient().hasPermission(conn, annotation.permission())) {
                    throw new NotAllowedException("Access denied, method is protected");
                }
            }
            if (method.getReturnType().equals(Void.TYPE)) {
                method.invoke(service, params);
                call.setStatus(Call.STATUS_SUCCESS_VOID);
            } else {
                result = method.invoke(service, params);
                call.setStatus(result == null ? Call.STATUS_SUCCESS_NULL : Call.STATUS_SUCCESS_RESULT);
            }
            if (call instanceof IPendingServiceCall) {
                ((IPendingServiceCall) call).setResult(result);
            }
        } catch (Exception e) {
            ...
            return false;
        }
        return true;
    }

这里为方便阅读,对异常部分作了改写。这段代码虽然稍微长点,但是其实做的事情很简单,就是从StreamService中获取方法,并调用该方法,具体的代码可以自己分析了,因为不是只找一次,是一次一次放宽了条件来找,找到方法后,就调用其invoke执行方法,并处理返回值。接下来就是调用StreamService的createStream方法,

    public Number createStream() {
        IConnection conn = Red5.getConnectionLocal();
        if (conn instanceof IStreamCapableConnection) {
            try {
                Number streamId = ((IStreamCapableConnection) conn).reserveStreamId();
                }
                return streamId;
            } catch (IndexOutOfBoundsException e) {
                return -1;
            }
        }
        return -1;
    }

这里会简单调用reserveStreamId获得一个随机的streamId并返回,

    public Number reserveStreamId() {
        double d = 1.0d;
        for (; d < MAX_RESERVED_STREAMS; d++) {
            if (reservedStreams.add(d)) {
                break;
            }
        }
        if (d == MAX_RESERVED_STREAMS) {
            throw new IndexOutOfBoundsException("Unable to reserve new stream");
        }
        return d;
    }

回到前面invoke方法中,最后Call的result里保存了刚刚分配的流的ID,并且状态为STATUS_SUCCESS_RESULT,然后返回给客户端。从这里也可以看出,createStream只负责分配一个ID,没有任何文件的创建、Socket的创建等等的功能。

时间: 2024-08-01 22:25:00

red5源码分析---8的相关文章

red5源码分析---10

red5源码分析-服务器处理publish命令 和前几章的分析一样,服务器接收到客户端发来的publish命令后,最终会执行RTMPHandler的onCommand函数,再参考<red5源码分析-8>的分析,最终会调用StreamService的publish方法,代码如下 public void publish(String name, String mode) { Map<String, String> params = null; if (name != null &

red5源码分析---6

red5源码分析-客户端和服务器的命令处理 在<red5源码分析-5>中可以知道,在RTMP握手完毕后,客户端会向服务器发送connect命令,connect命令的主要作用就是要和red5服务器上的某个Scope相连接,连接完成后,会向客户端发送带宽协调的指令,ping指令,和一个带宽检测指令.下面先分析ping指令. ping指令 服务端代码 这里先贴一下在服务器将客户端和某个Scope相连后发出的ping指令代码, ... conn.ping(new Ping(Ping.STREAM_BE

red5源码分析---12

red5源码分析-服务器处理视频数据 接着<red5源码分析-11>,本章假设客户端发来的是视频数据,下面就分析服务器如何处理这些数据的. 根据前面几章的分析,基于mina框架,数据到达服务器后,最终会到达RTMPHandler的messageReceived函数,messageReceived定义在RTMPHandler的父类BaseRTMPHandler中, public void messageReceived(RTMPConnection conn, Packet packet) th

red5源码分析---9

red5源码分析-客户端publish流 接着上一章的分析结果,参考<red5源码分析-7>的分析结论,当服务器返回steamId后,客户端会执行BaseRTMPClientHandler的onCommand函数,onCommand函数会根据返回的方法名"_result"开始执行handlePendingCallResult函数,handlePendingCallResult会获取之前注册的回调函数,根据<red5源码分析-7>,该回调函数就为CreateStr

red5源码分析---7

red5源码分析-客户端处理connect命令并发送createStream命令 在<red5源码分析-5>中提到过,当客户端发送connect命令后,服务器经过处理会将其connect命令返回,不同的是服务器返回的结果包含了一些连接后需要发送给客户端的信息,包括服务器版本.模式等等.当返回的信息经过服务器的发送过滤器RTMPMinaProtocolEncoder时,会调用其中的RTMPProtocolEncoder的encodeCommand函数,下面来看其中的一段代码, protected

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线