red5源码分析---10

red5源码分析—服务器处理publish命令

和前几章的分析一样,服务器接收到客户端发来的publish命令后,最终会执行RTMPHandler的onCommand函数,再参考《red5源码分析—8》的分析,最终会调用StreamService的publish方法,代码如下

    public void publish(String name, String mode) {
        Map<String, String> params = null;
        if (name != null && name.contains("?")) {
            params = new HashMap<String, String>();
            String tmp = name;
            if (name.charAt(0) != ‘?‘) {
                tmp = name.split("\\?")[1];
            } else if (name.charAt(0) == ‘?‘) {
                tmp = name.substring(1);
            }
            String[] kvs = tmp.split("&");
            for (String kv : kvs) {
                String[] split = kv.split("=");
                params.put(split[0], split[1]);
            }
            name = name.substring(0, name.indexOf("?"));
        }
        IConnection conn = Red5.getConnectionLocal();
        if (conn instanceof IStreamCapableConnection) {
            IScope scope = conn.getScope();
            IStreamCapableConnection streamConn = (IStreamCapableConnection) conn;
            Number streamId = conn.getStreamId();
            if (StringUtils.isEmpty(name)) {
                return;
            }
            IStreamSecurityService security = (IStreamSecurityService) ScopeUtils.getScopeService(scope, IStreamSecurityService.class);
            if (security != null) {
                Set<IStreamPublishSecurity> handlers = security.getStreamPublishSecurity();
                for (IStreamPublishSecurity handler : handlers) {
                    if (!handler.isPublishAllowed(scope, name, mode)) {
                        return;
                    }
                }
            }
            IBroadcastScope bsScope = getBroadcastScope(scope, name);
            if (bsScope != null && !bsScope.getProviders().isEmpty()) {
                return;
            }
            IClientStream stream = streamConn.getStreamById(streamId);
            if (stream != null && !(stream instanceof IClientBroadcastStream)) {
                return;
            }
            boolean created = false;
            if (stream == null) {
                stream = streamConn.newBroadcastStream(streamId);
                created = true;
            }
            IClientBroadcastStream bs = (IClientBroadcastStream) stream;
            try {
                bs.setPublishedName(name);
                if (params != null) {
                    bs.setParameters(params);
                }
                IContext context = conn.getScope().getContext();
                IProviderService providerService = (IProviderService) context.getBean(IProviderService.BEAN_NAME);
                if (providerService.registerBroadcastStream(conn.getScope(), name, bs)) {
                    bsScope = getBroadcastScope(conn.getScope(), name);
                    bsScope.setClientBroadcastStream(bs);
                    if (conn instanceof BaseConnection) {
                        ((BaseConnection) conn).registerBasicScope(bsScope);
                    }
                }
                if (IClientStream.MODE_RECORD.equals(mode)) {
                    bs.start();
                    bs.saveAs(name, false);
                } else if (IClientStream.MODE_APPEND.equals(mode)) {
                    bs.start();
                    bs.saveAs(name, true);
                } else {
                    bs.start();
                }
                bs.startPublishing();
            } catch (IOException e) {
                sendNSFailed(streamConn, StatusCodes.NS_RECORD_NOACCESS, "The file could not be created/written to.", name, streamId);
                bs.close();
                if (created) {
                    streamConn.deleteStreamById(streamId);
                }
            } catch (Exception e) {

            }
        }
    }

publish函数首先对传入的参数name进行处理,从name中提取出请求参数并保存在params中,将name去除参数部分,这个name就可以唯一表示一个publish流;publish函数接下来执行ScopeUtils的getScopeService方法构造StreamSecurityService用来对安全方面进行验证,这里不管它;然后通过getBroadcastScope检查对应的name下是否已经创建过BroadcastScope,一个BoradcastScope表示某个Scope下的某种类型的Scope,代码如下

    public IBroadcastScope getBroadcastScope(IScope scope, String name) {
        return scope.getBroadcastScope(name);
    }
    public IBroadcastScope getBroadcastScope(String name) {
        return (IBroadcastScope) children.getBasicScope(ScopeType.BROADCAST, name);
    }

如果对应的name下已经创建过BroadcastScope并且已被占用,getBroadcastScope就直接返回。

回到publish中,再往下就根据streamId检查RTMPMinaConnection连接里对应的IClientStream是否也被占用,如果被占用也直接返回。这里假设BroadcastScope和IClientStream都没有创建或被占用,因此接下来通过newBroadcastStream创建stream,newBroadcastStream定义在RTMPConnection中,

    public IClientBroadcastStream newBroadcastStream(Number streamId) {
        if (isValidStreamId(streamId)) {
            ClientBroadcastStream cbs = (ClientBroadcastStream) scope.getContext().getBean("clientBroadcastStream");
            customizeStream(streamId, cbs);
            if (!registerStream(cbs)) {
                cbs = null;
            }
            return cbs;
        }
        return null;
    }

newBroadcastStream函数首先通过isValidStreamId检查streamId的合法性,以及是否在该Id下已经创建了流。如果合法,就通过Spring创建ClientBroadcastStream,然后通过customizeStream设置ClientBroadcastStream对应的连接、Scope、name以及streamId,

    private void customizeStream(Number streamId, AbstractClientStream stream) {
        Integer buffer = streamBuffers.get(streamId.doubleValue());
        if (buffer != null) {
            stream.setClientBufferDuration(buffer);
        }
        stream.setName(createStreamName());
        stream.setConnection(this);
        stream.setScope(this.getScope());
        stream.setStreamId(streamId);
    }

newBroadcastStream接下来执行registerStream向RTMPMinaConnection注册刚刚创建的流ClientBroadcastStream,

    private boolean registerStream(IClientStream stream) {
        if (streams.putIfAbsent(stream.getStreamId().doubleValue(), stream) == null) {
            usedStreams.incrementAndGet();
            return true;
        }
        return false;
    }

回到publish中,再往下对刚刚创建的ClientBroadcastStream进行基本的设置。然后就通过Spring获得ProviderService,并调用其registerBroadcastStream注册刚刚创建的ClientBroadcastStream,代码如下

    public boolean registerBroadcastStream(IScope scope, String name, IBroadcastStream bs) {
        IBroadcastScope broadcastScope = scope.getBroadcastScope(name);
        if (broadcastScope == null) {
            broadcastScope = new BroadcastScope(scope, name);
            if (scope.addChildScope(broadcastScope)) {

            } else {

            }
        }
        if (broadcastScope != null && bs instanceof IClientBroadcastStream) {
            broadcastScope.setClientBroadcastStream((IClientBroadcastStream) bs);
        }
        return broadcastScope.subscribe(bs.getProvider(), null);
    }

registerBroadcastStream函数首先会创建一个BroadcastScope,其构造函数如下,

    public BroadcastScope(IScope parent, String name) {
        super(parent, ScopeType.BROADCAST, name, false);
        pipe = new InMemoryPushPushPipe(this);
        keepOnDisconnect = true;
    }

注意这里会创建一个管道InMemoryPushPushPipe,后面的章节会分析到这个管道。

registerBroadcastStream函数接下来会调用addChildScope将刚刚创建的BroadcastScope注册到其父Scope上,该函数之前的章节分析过了;接着会将对应的stream流设置进BroadcastScope里;最后通过subscribe注册自身成为Provider,BroadcastScope里的Provider表示一个流的提供者,getProvider返回的是ClientBroadcastStream自身,subscribe定义如下,

    public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
        return !removed && pipe.subscribe(provider, paramMap);
    }

pipe就是刚刚在BroadcastScope构造函数中创建的InMemoryPushPushPipe,其subscribe函数如下,

    public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
        boolean success = super.subscribe(provider, paramMap);
        if (success) {
            fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_CONNECT_PUSH, paramMap);
        }
        return success;
    }

InMemoryPushPushPipe的父类为AbstractPipe,其subscribe如下,

    public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
        boolean success = providers.addIfAbsent(provider);
        if (success && provider instanceof IPipeConnectionListener) {
            listeners.addIfAbsent((IPipeConnectionListener) provider);
        }
        return success;
    }

AbstractPipe的subscribe函数就是向InMemoryPushPushPipe的providers注册Provider,即流的提供者,并且添加监听器,因为ClientBroadcastStream自身实现了IPipeConnectionListener接口。

回到InMemoryPushPushPipe的subscribe函数中,接下来通过fireProviderConnectionEvent触发监听回调函数,fireProviderConnectionEvent的代码如下,

    protected void fireProviderConnectionEvent(IProvider provider, int type, Map<String, Object> paramMap) {
        PipeConnectionEvent event = new PipeConnectionEvent(this);
        event.setProvider(provider);
        event.setType(type);
        event.setParamMap(paramMap);
        firePipeConnectionEvent(event);
    }

这里构造了PipeConnectionEvent,即代表管道连接的事件,接着通过firePipeConnectionEvent触发监听器,

    protected void firePipeConnectionEvent(PipeConnectionEvent event) {
        for (IPipeConnectionListener element : listeners) {
            try {
                element.onPipeConnectionEvent(event);
            } catch (Throwable t) {

            }
        }
        if (taskExecutor == null) {
            taskExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory("Pipe-"));
        }
        for (Runnable task : event.getTaskList()) {
            try {
                taskExecutor.execute(task);
            } catch (Throwable t) {

            }
        }
        event.getTaskList().clear();
    }

首先通过onPipeConnectionEvent触发监听器事件,接着启动线程处理各个task。因为前面的subscribe函数中注册了监听器,即ClientBroadcastStream自身,这里的onPipeConnectionEvent定义在ClientBroadcastStream中,

    public void onPipeConnectionEvent(PipeConnectionEvent event) {
        switch (event.getType()) {
            case PipeConnectionEvent.PROVIDER_CONNECT_PUSH:
                if (event.getProvider() == this && event.getSource() != connMsgOut && (event.getParamMap() == null || !event.getParamMap().containsKey("record"))) {
                    this.livePipe = (IPipe) event.getSource();
                    for (IConsumer consumer : this.livePipe.getConsumers()) {
                        subscriberStats.increment();
                    }
                }
                break;
            case PipeConnectionEvent.PROVIDER_DISCONNECT:
                ...
                break;
            case PipeConnectionEvent.CONSUMER_CONNECT_PUSH:
                ...
                break;
            case PipeConnectionEvent.CONSUMER_DISCONNECT:
                ...
                break;
            default:
        }
    }

因为前面设置的PipeConnectionEvent的事件类型为PROVIDER_CONNECT_PUSH,因此只看这一部分代码,onPipeConnectionEvent函数其实只是设置了成员变量livePipe,其实也是InMemoryPushPushPipe自身。

回到publish中,假设registerBroadcastStream注册成功,接下来通过getBroadcastScope获得前面创建的BroadcastScope并设置对应的ClientBroadcastStream;接着调用registerBasicScope设置刚刚创建的BroadcastScope,registerBasicScope定义在BaseConnection中,

    public void registerBasicScope(IBroadcastScope basicScope) {
        basicScopes.add(basicScope);
        basicScope.addEventListener(this);
    }

再回到publish中,假设前面客户端的请求中参数mode是MODE_LIVE,因此调用ClientBroadcastStream的start函数和startPublishing函数,下面分别分析。

start函数

start定义在ClientBroadcastStream中,

    public void start() {
        checkVideoCodec = true;
        checkAudioCodec = true;
        firstPacketTime = -1;
        latestTimeStamp = -1;
        bytesReceived = 0;
        IConsumerService consumerManager = (IConsumerService) getScope().getContext().getBean(IConsumerService.KEY);
        connMsgOut = consumerManager.getConsumerOutput(this);
        if (connMsgOut != null && connMsgOut.subscribe(this, null)) {
            setCodecInfo(new StreamCodecInfo());
            creationTime = System.currentTimeMillis();
            closed = false;
        } else {

        }
    }

在进行一些成员变量的初始化后,start函数通过Spring获得ConsumerService,并调用其getConsumerOutput函数创建另一个管道并作相应的设置,

    public IMessageOutput getConsumerOutput(IClientStream stream) {
        IStreamCapableConnection streamConn = stream.getConnection();
        if (streamConn != null && streamConn instanceof RTMPConnection) {
            RTMPConnection conn = (RTMPConnection) streamConn;
            OutputStream o = conn.createOutputStream(stream.getStreamId());
            IPipe pipe = new InMemoryPushPushPipe();
            pipe.subscribe(new ConnectionConsumer(conn, o.getVideo(), o.getAudio(), o.getData()), null);
            return pipe;
        }
        return null;
    }

getConsumerOutput首先通过ClientBroadcastStream获得RTMPMinaConnection连接,并执行其createOutputStream创建输出流,该函数前面的章节已经分析过了,这里再看一次,

    public OutputStream createOutputStream(Number streamId) {
        int channelId = getChannelIdForStreamId(streamId);
        final Channel data = getChannel(channelId++);
        final Channel video = getChannel(channelId++);
        final Channel audio = getChannel(channelId++);
        return new OutputStream(video, audio, data);
    }

createOutputStream通过获得数据、音频、视频的Channel构造了一个OutputStream并返回。

回到getConsumerOutput函数中,接下来构造了一个ConnectionConsumer,然后创建了另一个管道InMemoryPushPushPipe并执行该管道的subscribe函数,

    public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
        if (consumer instanceof IPushableConsumer) {
            boolean success = super.subscribe(consumer, paramMap);
            if (success) {
                fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_CONNECT_PUSH, paramMap);
            }
            return success;
        } else {

        }
    }

前面已经分析了一遍subscribe函数,但是这里传入的参数不是IProvider,而是IConsumer,继续看其父类的subscribe函数,

    public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
        boolean success = consumers.addIfAbsent(consumer);
        if (success && consumer instanceof IPipeConnectionListener) {
            listeners.addIfAbsent((IPipeConnectionListener) consumer);
        }
        return success;
    }

该函数只是添加了Consumer以及监听器。

继续回到子类InMemoryPushPushPipe的subscribe函数中,假设执行成功,就调用fireConsumerConnectionEvent触发监听事件了,

    protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map<String, Object> paramMap) {
        PipeConnectionEvent event = new PipeConnectionEvent(this);
        event.setConsumer(consumer);
        event.setType(type);
        event.setParamMap(paramMap);
        firePipeConnectionEvent(event);
    }

这里依然构造了PipeConnectionEvent事件,并调用firePipeConnectionEvent开始处理,

    protected void firePipeConnectionEvent(PipeConnectionEvent event) {
        for (IPipeConnectionListener element : listeners) {
            try {
                element.onPipeConnectionEvent(event);
            } catch (Throwable t) {

            }
        }
        if (taskExecutor == null) {
            taskExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory("Pipe-"));
        }
        for (Runnable task : event.getTaskList()) {
            try {
                taskExecutor.execute(task);
            } catch (Throwable t) {

            }
        }
        event.getTaskList().clear();
    }

首先通过onPipeConnectionEvent触发监听器事件,接着启动线程处理各个task。这里的onPipeConnectionEvent定义在ClientBroadcastStream中,

    public void onPipeConnectionEvent(PipeConnectionEvent event) {
        switch (event.getType()) {
            case PipeConnectionEvent.PROVIDER_CONNECT_PUSH:
                ...
                break;
            case PipeConnectionEvent.PROVIDER_DISCONNECT:
                ...
                break;
            case PipeConnectionEvent.CONSUMER_CONNECT_PUSH:
                IPipe pipe = (IPipe) event.getSource();
                if (this.livePipe == pipe) {
                    notifyChunkSize();
                }
                subscriberStats.increment();
                break;
            case PipeConnectionEvent.CONSUMER_DISCONNECT:
                ...
                break;
            default:
        }
    }

和前面不同的是,这里触发的是PipeConnectionEvent.CONSUMER_CONNECT_PUSH事件,但其实什么也没做。

值得注意的subscribe函数会回调刚刚创建的ConnectionConsumer的onPipeConnectionEvent函数,但其实什么都没做,代码如下,

    public void onPipeConnectionEvent(PipeConnectionEvent event) {
        switch (event.getType()) {
            case PipeConnectionEvent.PROVIDER_DISCONNECT:
                closeChannels();
                break;
            default:
        }
    }

回到ClientBroadcastStream的start函数中,返回的InMemoryPushPushPipe保存在connMsgOut中,接下来又执行了一遍subscribe,继续往InMemoryPushPushPipe注册Consumer,这里的回调函数就定义在ClientBroadcastStream中,刚刚也已经分析过了。假设注册成功,接下来就创建一个StreamCodecInfo并进行相应的设置,后面的章节会分析到。

startPublishing函数

startPublishing定义在ClientBroadcastStream中,代码如下,

    public void startPublishing() {
        sendStartNotifications(Red5.getConnectionLocal());
        if (automaticRecording) {
            try {
                saveAs(publishedName, false);
            } catch (Exception e) {

            }
        }
    }

首先通过sendStartNotifications发送流即将开始的通知,代码如下,

    private void sendStartNotifications(IEventListener source) {
        if (sendStartNotification) {
            sendStartNotification = false;
            if (source instanceof IConnection) {
                IScope scope = ((IConnection) source).getScope();
                if (scope.hasHandler()) {
                    final Object handler = scope.getHandler();
                    if (handler instanceof IStreamAwareScopeHandler) {
                        if (recordingListener != null && recordingListener.get().isRecording()) {
                            ((IStreamAwareScopeHandler) handler).streamRecordStart(this);
                        } else {
                            try {
                                File file = getRecordFile(scope, publishedName);
                                if (file != null && file.exists()) {
                                    if (!file.delete()) {

                                    }
                                }
                            } catch (Exception e) {

                            }
                            ((IStreamAwareScopeHandler) handler).streamPublishStart(this);
                        }
                    }
                }
            }
            sendPublishStartNotify();
            if (recordingListener != null && recordingListener.get().isRecording()) {
                sendRecordStartNotify();
            }
            notifyBroadcastStart();
        }
    }

sendStartNotification在ClientBroadcastStream创建时就默认为true,因为sendStartNotifications只执行一次,因此进入后首先将sendStartNotification设置为false。

传入的参数source实际上就是RTMPMinaConnection,通过getScope获得其对应的Scope后,再通过getHandler获得对应的handler,实际上就是CoreHandler,并没有实现IStreamAwareScopeHandler接口。

再往下通过sendPublishStartNotify向管道推送消息,

    private void sendPublishStartNotify() {
        Status publishStatus = new Status(StatusCodes.NS_PUBLISH_START);
        publishStatus.setClientid(getStreamId());
        publishStatus.setDetails(getPublishedName());

        StatusMessage startMsg = new StatusMessage();
        startMsg.setBody(publishStatus);
        pushMessage(startMsg);
    }

这里主要创建了状态为StatusCodes.NS_PUBLISH_START的Status,包装为消息后,然后通过pushMessage发送该消息,

    protected void pushMessage(StatusMessage msg) {
        if (connMsgOut != null) {
            try {
                connMsgOut.pushMessage(msg);
            } catch (IOException err) {

            }
        } else {

        }
    }

connMsgOut就是前面在ClientBroadcastStream中创建的InMemoryPushPushPipe,来看它的pushMessage函数,

    public void pushMessage(IMessage message) throws IOException {
        for (IConsumer consumer : consumers) {
            try {
                IPushableConsumer pcon = (IPushableConsumer) consumer;
                if (message instanceof RTMPMessage) {
                    RTMPMessage rtmpMessage = (RTMPMessage) message;
                    IRTMPEvent body = rtmpMessage.getBody();
                    int time = body.getTimestamp();
                    pcon.pushMessage(this, message);
                    body.setTimestamp(time);
                } else {
                    pcon.pushMessage(this, message);
                }
            } catch (Throwable t) {

            }
        }
    }

刚函数获取该管道注册过的IConsumer,跟着前面的分析,这里传入的message并没有继承或实现RTMPMessage,因此直接调用每个IConsumer的pushMessage函数。

根据本章前面的分析结果,之前一共注册了两个IConsumer,一个是在ClientBroadcastStream的getConsumerOutput函数中创建的ConnectionConsumer,另一个也是在ClientBroadcastStream的start函数中注册的自身,因为ClientBroadcastStream实现的IPushableConsumer接口继承自IConsumer,下面分别来看它们的pushMessage函数,

ConnectionConsumer的pushMessage函数的代码如下,

    public void pushMessage(IPipe pipe, IMessage message) {
        if (message instanceof ResetMessage) {

        } else if (message instanceof StatusMessage) {
            StatusMessage statusMsg = (StatusMessage) message;
            data.sendStatus(statusMsg.getBody());
        } else if (message instanceof RTMPMessage) {
            ...
        } else {

        }
    }

因为传入的message就是StatusMessage,因此这里通过ConnectionConsumer的数据Channel(data)发送出去了,sendStatus定义在Channel中,代码如下,

    public void sendStatus(Status status) {
        if (connection != null) {
            final boolean andReturn = !status.getCode().equals(StatusCodes.NS_DATA_START);
            final Invoke event = new Invoke();
            if (andReturn) {
                final PendingCall call = new PendingCall(null, CALL_ON_STATUS, new Object[] { status });
                if (status.getCode().equals(StatusCodes.NS_PLAY_START)) {
                    ...
                }
                event.setCall(call);
            } else {
                ...
            }
            write(event, connection.getStreamIdForChannelId(id));
        }
    }

根据前面的分析,传入的statusCode是NS_PUBLISH_START,因此andReturn为true,sendStatus设置PendingCall后,就直接调用write将数据发送给客户端了。

ClientBroadcastStream的pushMessage函数为空。

回到sendStartNotifications函数中,接下来通过notifyBroadcastStart触发别的监听器,但其实什么也没做。

再回到startPublishing函数中,接下来调用saveAs继续处理,

    public void saveAs(String name, boolean isAppend) throws IOException {
        IStreamCapableConnection conn = getConnection();
        if (recordingListener == null) {
            IRecordingListener listener = new RecordingListener();
            if (listener.init(conn, name, isAppend)) {
                IStreamCodecInfo codecInfo = getCodecInfo();
                if (codecInfo instanceof StreamCodecInfo) {
                    StreamCodecInfo info = (StreamCodecInfo) codecInfo;
                    IVideoStreamCodec videoCodec = info.getVideoCodec();
                    if (videoCodec != null) {
                        IoBuffer config = videoCodec.getDecoderConfiguration();
                        if (config != null) {
                            VideoData videoConf = new VideoData(config.asReadOnlyBuffer());
                            try {                       listener.getFileConsumer().setVideoDecoderConfiguration(videoConf);
                            } finally {
                                videoConf.release();
                            }
                        }
                    } else {

                    }
                    IAudioStreamCodec audioCodec = info.getAudioCodec();
                    if (audioCodec != null) {
                        IoBuffer config = audioCodec.getDecoderConfiguration();
                        if (config != null) {
                            AudioData audioConf = new AudioData(config.asReadOnlyBuffer());
                            try {                         listener.getFileConsumer().setAudioDecoderConfiguration(audioConf);
                            } finally {
                                audioConf.release();
                            }
                        }
                    } else {

                    }
                }
                recordingListener = new WeakReference<IRecordingListener>(listener);
                addStreamListener(listener);
                listener.start();
            } else {

            }
        } else {

        }
    }

saveAs函数首先创建了一个RecordingListener,并调用其init函数进行初始化,init函数的代码如下,

    public boolean init(IConnection conn, String name, boolean isAppend) {
        return init(conn.getScope(), name, isAppend);
    }

    public boolean init(IScope scope, String name, boolean isAppend) {
        File file = getRecordFile(scope, name);
        if (file != null) {
            if (!isAppend) {
                if (file.exists()) {
                    if (!file.delete()) {
                        return false;
                    }
                }
            } else {
                if (file.exists()) {
                    appending = true;
                } else {
                    isAppend = false;
                }
            }
            if (!file.exists()) {
                String path = file.getAbsolutePath();
                int slashPos = path.lastIndexOf(File.separator);
                if (slashPos != -1) {
                    path = path.substring(0, slashPos);
                }
                File tmp = new File(path);
                if (!tmp.isDirectory()) {
                    tmp.mkdirs();
                }
                try {
                    file.createNewFile();
                } catch (IOException e) {
                    return false;
                }
            }
            if (scope.getContext().hasBean("keyframe.cache")) {
                IKeyFrameMetaCache keyFrameCache = (IKeyFrameMetaCache) scope.getContext().getBean("keyframe.cache");
                keyFrameCache.removeKeyFrameMeta(file);
            }
            if (scope.getContext().hasBean("fileConsumer")) {
                recordingConsumer = (FileConsumer) scope.getContext().getBean("fileConsumer");
                recordingConsumer.setScope(scope);
                recordingConsumer.setFile(file);
            } else {
                recordingConsumer = new FileConsumer(scope, file);
            }
            if (isAppend) {
                recordingConsumer.setMode("append");
            } else {
                recordingConsumer.setMode("record");
            }
            setFileName(file.getName());
            scheduler = (QuartzSchedulingService) scope.getParent().getContext().getBean(QuartzSchedulingService.BEAN_NAME);
            recording.set(true);
        } else {

        }
        return recording.get();
    }

init函数首先根据Scope和name通过getRecordFile函数获取文件,代码如下,

    public static File getRecordFile(IScope scope, String name) {
        IStreamFilenameGenerator generator = (IStreamFilenameGenerator) ScopeUtils.getScopeService(scope, IStreamFilenameGenerator.class, DefaultStreamFilenameGenerator.class);
        String fileName = generator.generateFilename(scope, name, ".flv", GenerationType.RECORD);
        File file = null;
        if (generator.resolvesToAbsolutePath()) {
            file = new File(fileName);
        } else {
            Resource resource = scope.getContext().getResource(fileName);
            if (resource.exists()) {
                try {
                    file = resource.getFile();
                } catch (IOException ioe) {

                }
            } else {
                String appScopeName = ScopeUtils.findApplication(scope).getName();
                file = new File(String.format("%s/webapps/%s/%s", System.getProperty("red5.root"), appScopeName, fileName));
            }
        }
        return file;
    }

getRecordFile函数首先通过ScopeUtils的getScopeService函数构造一个DefaultStreamFilenameGenerator,并通过其generateFilename函数生成文件名,

    public String generateFilename(IScope scope, String name, GenerationType type) {
        return generateFilename(scope, name, null, type);
    }

    public String generateFilename(IScope scope, String name, String extension, GenerationType type) {
        String result = getStreamDirectory(scope) + name;
        if (extension != null && !extension.equals("")) {
            result += extension;
        }
        return result;
    }

getStreamDirectory获取Scope对应的路径名,因此最后的文件名为”Scope路径名+name+extension”。

回到getRecordFile函数中,resolvesToAbsolutePath函数默认返回false,因此直接看接下来的else部分,因为是第一次创建,因此getResource返回的Resource不存在,因此最后会创建一个File,用来存储流的数据。

回到RecordingListener的init函数中,传入的参数isAppend为false,因此这里会删除同名File,紧接下来就会通过createNewFile创建该File。

再往下,init函数会根据red5-common.xml配置文件创建CachingFileKeyFrameMetaCache,并

    public void removeKeyFrameMeta(File file) {
        rwLock.writeLock().lock();
        try {
            String canonicalPath = file.getCanonicalPath();
            inMemoryMetaCache.remove(canonicalPath);
        } catch (IOException e) {
        } finally {
            rwLock.writeLock().unlock();
        }
        super.removeKeyFrameMeta(file);
    }
    public void removeKeyFrameMeta(File file) {
        String filename = String.format("%s.meta", file.getAbsolutePath());
        File metadataFile = new File(filename);
        if (metadataFile.exists()) {
            if (metadataFile.delete()) {

            } else {
                metadataFile.deleteOnExit();
            }
        } else {

        }
    }

这里其实就是删除流文件对应的信息文件。

回到init函数中,再往下创建了一个FileConsumer并设置其模式为record,最后返回true。

回到saveAs函数中,RecordingListener初始化完成后,首先通过getCodecInfo获得前面在ClientBroadcastStream的start函数中创建的StreamCodecInfo,但是接下来的getVideoCodec和getAudioCodec返回空,所以直接跳到最后看,首先通过addStreamListener将刚刚创建的RecordingListener函数设置进ClientBroadcastStream的listeners中,

    public void addStreamListener(IStreamListener listener) {
        listeners.add(listener);
    }

接下来就调用RecordingListener的start函数了,

    public void start() {
        eqj = new EventQueueJob();
        eventQueueJobName = scheduler.addScheduledJob(3000, eqj);
    }

start函数很简单,就是启用一个线程执行EventQueueJob,用来处理接收到的事件。

下一章开始分析客户端如何发送流给服务器。

时间: 2024-10-25 05:26:29

red5源码分析---10的相关文章

red5源码分析---12

red5源码分析-服务器处理视频数据 接着<red5源码分析-11>,本章假设客户端发来的是视频数据,下面就分析服务器如何处理这些数据的. 根据前面几章的分析,基于mina框架,数据到达服务器后,最终会到达RTMPHandler的messageReceived函数,messageReceived定义在RTMPHandler的父类BaseRTMPHandler中, public void messageReceived(RTMPConnection conn, Packet packet) th

Solr4.8.0源码分析(10)之Lucene的索引文件(3)

Solr4.8.0源码分析(10)之Lucene的索引文件(3) 1. .si文件 .si文件存储了段的元数据,主要涉及SegmentInfoFormat.java和Segmentinfo.java这两个文件.由于本文介绍的Solr4.8.0,所以对应的是SegmentInfoFormat的子类Lucene46SegmentInfoFormat. 首先来看下.si文件的格式 头部(header) 版本(SegVersion) doc个数(SegSize) 是否符合文档格式(IsCompoundF

red5源码分析---6

red5源码分析-客户端和服务器的命令处理 在<red5源码分析-5>中可以知道,在RTMP握手完毕后,客户端会向服务器发送connect命令,connect命令的主要作用就是要和red5服务器上的某个Scope相连接,连接完成后,会向客户端发送带宽协调的指令,ping指令,和一个带宽检测指令.下面先分析ping指令. ping指令 服务端代码 这里先贴一下在服务器将客户端和某个Scope相连后发出的ping指令代码, ... conn.ping(new Ping(Ping.STREAM_BE

red5源码分析---9

red5源码分析-客户端publish流 接着上一章的分析结果,参考<red5源码分析-7>的分析结论,当服务器返回steamId后,客户端会执行BaseRTMPClientHandler的onCommand函数,onCommand函数会根据返回的方法名"_result"开始执行handlePendingCallResult函数,handlePendingCallResult会获取之前注册的回调函数,根据<red5源码分析-7>,该回调函数就为CreateStr

red5源码分析---8

red5源码分析-服务器处理createStream命令 服务器接到createStream命令后,经过过滤器层层处理,最后会调用BaseRTMPHandler的messageReceived函数, public void messageReceived(RTMPConnection conn, Packet packet) throws Exception { if (conn != null) { IRTMPEvent message = null; try { message = pack

red5源码分析---7

red5源码分析-客户端处理connect命令并发送createStream命令 在<red5源码分析-5>中提到过,当客户端发送connect命令后,服务器经过处理会将其connect命令返回,不同的是服务器返回的结果包含了一些连接后需要发送给客户端的信息,包括服务器版本.模式等等.当返回的信息经过服务器的发送过滤器RTMPMinaProtocolEncoder时,会调用其中的RTMPProtocolEncoder的encodeCommand函数,下面来看其中的一段代码, protected

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

SDK源码分析 10

1 #include <windows.h> 2 3 LRESULT CALLBACK WndProc(HWND, UINT, WPARAM, LPARAM); 4 5 //定义结构体 一个是 int 一个是字符串 6 struct 7 { 8 int iStyle; 9 TCHAR *szText; 10 } 11 button[] = 12 { 13 BS_PUSHBUTTON, TEXT("PUSHBUTTON"), 14 BS_DEFPUSHBUTTON, TEXT

Mesos源码分析

Mesos源码分析(1): Mesos的启动过程总论 Mesos源码分析(2): Mesos Master的启动之一 Mesos源码分析(3): Mesos Master的启动之二 Mesos源码分析(4) Mesos Master的启动之三 Mesos源码分析(5): Mesos Master的启动之四 Mesos源码分析(6): Mesos Master的初始化 Mesos源码分析(7): Mesos-Slave的启动 Mesos源码分析(8): Mesos-Slave的初始化 Mesos源