red5源码分析—服务器处理publish命令
和前几章的分析一样,服务器接收到客户端发来的publish命令后,最终会执行RTMPHandler的onCommand函数,再参考《red5源码分析—8》的分析,最终会调用StreamService的publish方法,代码如下
public void publish(String name, String mode) {
Map<String, String> params = null;
if (name != null && name.contains("?")) {
params = new HashMap<String, String>();
String tmp = name;
if (name.charAt(0) != ‘?‘) {
tmp = name.split("\\?")[1];
} else if (name.charAt(0) == ‘?‘) {
tmp = name.substring(1);
}
String[] kvs = tmp.split("&");
for (String kv : kvs) {
String[] split = kv.split("=");
params.put(split[0], split[1]);
}
name = name.substring(0, name.indexOf("?"));
}
IConnection conn = Red5.getConnectionLocal();
if (conn instanceof IStreamCapableConnection) {
IScope scope = conn.getScope();
IStreamCapableConnection streamConn = (IStreamCapableConnection) conn;
Number streamId = conn.getStreamId();
if (StringUtils.isEmpty(name)) {
return;
}
IStreamSecurityService security = (IStreamSecurityService) ScopeUtils.getScopeService(scope, IStreamSecurityService.class);
if (security != null) {
Set<IStreamPublishSecurity> handlers = security.getStreamPublishSecurity();
for (IStreamPublishSecurity handler : handlers) {
if (!handler.isPublishAllowed(scope, name, mode)) {
return;
}
}
}
IBroadcastScope bsScope = getBroadcastScope(scope, name);
if (bsScope != null && !bsScope.getProviders().isEmpty()) {
return;
}
IClientStream stream = streamConn.getStreamById(streamId);
if (stream != null && !(stream instanceof IClientBroadcastStream)) {
return;
}
boolean created = false;
if (stream == null) {
stream = streamConn.newBroadcastStream(streamId);
created = true;
}
IClientBroadcastStream bs = (IClientBroadcastStream) stream;
try {
bs.setPublishedName(name);
if (params != null) {
bs.setParameters(params);
}
IContext context = conn.getScope().getContext();
IProviderService providerService = (IProviderService) context.getBean(IProviderService.BEAN_NAME);
if (providerService.registerBroadcastStream(conn.getScope(), name, bs)) {
bsScope = getBroadcastScope(conn.getScope(), name);
bsScope.setClientBroadcastStream(bs);
if (conn instanceof BaseConnection) {
((BaseConnection) conn).registerBasicScope(bsScope);
}
}
if (IClientStream.MODE_RECORD.equals(mode)) {
bs.start();
bs.saveAs(name, false);
} else if (IClientStream.MODE_APPEND.equals(mode)) {
bs.start();
bs.saveAs(name, true);
} else {
bs.start();
}
bs.startPublishing();
} catch (IOException e) {
sendNSFailed(streamConn, StatusCodes.NS_RECORD_NOACCESS, "The file could not be created/written to.", name, streamId);
bs.close();
if (created) {
streamConn.deleteStreamById(streamId);
}
} catch (Exception e) {
}
}
}
publish函数首先对传入的参数name进行处理,从name中提取出请求参数并保存在params中,将name去除参数部分,这个name就可以唯一表示一个publish流;publish函数接下来执行ScopeUtils的getScopeService方法构造StreamSecurityService用来对安全方面进行验证,这里不管它;然后通过getBroadcastScope检查对应的name下是否已经创建过BroadcastScope,一个BoradcastScope表示某个Scope下的某种类型的Scope,代码如下
public IBroadcastScope getBroadcastScope(IScope scope, String name) {
return scope.getBroadcastScope(name);
}
public IBroadcastScope getBroadcastScope(String name) {
return (IBroadcastScope) children.getBasicScope(ScopeType.BROADCAST, name);
}
如果对应的name下已经创建过BroadcastScope并且已被占用,getBroadcastScope就直接返回。
回到publish中,再往下就根据streamId检查RTMPMinaConnection连接里对应的IClientStream是否也被占用,如果被占用也直接返回。这里假设BroadcastScope和IClientStream都没有创建或被占用,因此接下来通过newBroadcastStream创建stream,newBroadcastStream定义在RTMPConnection中,
public IClientBroadcastStream newBroadcastStream(Number streamId) {
if (isValidStreamId(streamId)) {
ClientBroadcastStream cbs = (ClientBroadcastStream) scope.getContext().getBean("clientBroadcastStream");
customizeStream(streamId, cbs);
if (!registerStream(cbs)) {
cbs = null;
}
return cbs;
}
return null;
}
newBroadcastStream函数首先通过isValidStreamId检查streamId的合法性,以及是否在该Id下已经创建了流。如果合法,就通过Spring创建ClientBroadcastStream,然后通过customizeStream设置ClientBroadcastStream对应的连接、Scope、name以及streamId,
private void customizeStream(Number streamId, AbstractClientStream stream) {
Integer buffer = streamBuffers.get(streamId.doubleValue());
if (buffer != null) {
stream.setClientBufferDuration(buffer);
}
stream.setName(createStreamName());
stream.setConnection(this);
stream.setScope(this.getScope());
stream.setStreamId(streamId);
}
newBroadcastStream接下来执行registerStream向RTMPMinaConnection注册刚刚创建的流ClientBroadcastStream,
private boolean registerStream(IClientStream stream) {
if (streams.putIfAbsent(stream.getStreamId().doubleValue(), stream) == null) {
usedStreams.incrementAndGet();
return true;
}
return false;
}
回到publish中,再往下对刚刚创建的ClientBroadcastStream进行基本的设置。然后就通过Spring获得ProviderService,并调用其registerBroadcastStream注册刚刚创建的ClientBroadcastStream,代码如下
public boolean registerBroadcastStream(IScope scope, String name, IBroadcastStream bs) {
IBroadcastScope broadcastScope = scope.getBroadcastScope(name);
if (broadcastScope == null) {
broadcastScope = new BroadcastScope(scope, name);
if (scope.addChildScope(broadcastScope)) {
} else {
}
}
if (broadcastScope != null && bs instanceof IClientBroadcastStream) {
broadcastScope.setClientBroadcastStream((IClientBroadcastStream) bs);
}
return broadcastScope.subscribe(bs.getProvider(), null);
}
registerBroadcastStream函数首先会创建一个BroadcastScope,其构造函数如下,
public BroadcastScope(IScope parent, String name) {
super(parent, ScopeType.BROADCAST, name, false);
pipe = new InMemoryPushPushPipe(this);
keepOnDisconnect = true;
}
注意这里会创建一个管道InMemoryPushPushPipe,后面的章节会分析到这个管道。
registerBroadcastStream函数接下来会调用addChildScope将刚刚创建的BroadcastScope注册到其父Scope上,该函数之前的章节分析过了;接着会将对应的stream流设置进BroadcastScope里;最后通过subscribe注册自身成为Provider,BroadcastScope里的Provider表示一个流的提供者,getProvider返回的是ClientBroadcastStream自身,subscribe定义如下,
public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
return !removed && pipe.subscribe(provider, paramMap);
}
pipe就是刚刚在BroadcastScope构造函数中创建的InMemoryPushPushPipe,其subscribe函数如下,
public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
boolean success = super.subscribe(provider, paramMap);
if (success) {
fireProviderConnectionEvent(provider, PipeConnectionEvent.PROVIDER_CONNECT_PUSH, paramMap);
}
return success;
}
InMemoryPushPushPipe的父类为AbstractPipe,其subscribe如下,
public boolean subscribe(IProvider provider, Map<String, Object> paramMap) {
boolean success = providers.addIfAbsent(provider);
if (success && provider instanceof IPipeConnectionListener) {
listeners.addIfAbsent((IPipeConnectionListener) provider);
}
return success;
}
AbstractPipe的subscribe函数就是向InMemoryPushPushPipe的providers注册Provider,即流的提供者,并且添加监听器,因为ClientBroadcastStream自身实现了IPipeConnectionListener接口。
回到InMemoryPushPushPipe的subscribe函数中,接下来通过fireProviderConnectionEvent触发监听回调函数,fireProviderConnectionEvent的代码如下,
protected void fireProviderConnectionEvent(IProvider provider, int type, Map<String, Object> paramMap) {
PipeConnectionEvent event = new PipeConnectionEvent(this);
event.setProvider(provider);
event.setType(type);
event.setParamMap(paramMap);
firePipeConnectionEvent(event);
}
这里构造了PipeConnectionEvent,即代表管道连接的事件,接着通过firePipeConnectionEvent触发监听器,
protected void firePipeConnectionEvent(PipeConnectionEvent event) {
for (IPipeConnectionListener element : listeners) {
try {
element.onPipeConnectionEvent(event);
} catch (Throwable t) {
}
}
if (taskExecutor == null) {
taskExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory("Pipe-"));
}
for (Runnable task : event.getTaskList()) {
try {
taskExecutor.execute(task);
} catch (Throwable t) {
}
}
event.getTaskList().clear();
}
首先通过onPipeConnectionEvent触发监听器事件,接着启动线程处理各个task。因为前面的subscribe函数中注册了监听器,即ClientBroadcastStream自身,这里的onPipeConnectionEvent定义在ClientBroadcastStream中,
public void onPipeConnectionEvent(PipeConnectionEvent event) {
switch (event.getType()) {
case PipeConnectionEvent.PROVIDER_CONNECT_PUSH:
if (event.getProvider() == this && event.getSource() != connMsgOut && (event.getParamMap() == null || !event.getParamMap().containsKey("record"))) {
this.livePipe = (IPipe) event.getSource();
for (IConsumer consumer : this.livePipe.getConsumers()) {
subscriberStats.increment();
}
}
break;
case PipeConnectionEvent.PROVIDER_DISCONNECT:
...
break;
case PipeConnectionEvent.CONSUMER_CONNECT_PUSH:
...
break;
case PipeConnectionEvent.CONSUMER_DISCONNECT:
...
break;
default:
}
}
因为前面设置的PipeConnectionEvent的事件类型为PROVIDER_CONNECT_PUSH,因此只看这一部分代码,onPipeConnectionEvent函数其实只是设置了成员变量livePipe,其实也是InMemoryPushPushPipe自身。
回到publish中,假设registerBroadcastStream注册成功,接下来通过getBroadcastScope获得前面创建的BroadcastScope并设置对应的ClientBroadcastStream;接着调用registerBasicScope设置刚刚创建的BroadcastScope,registerBasicScope定义在BaseConnection中,
public void registerBasicScope(IBroadcastScope basicScope) {
basicScopes.add(basicScope);
basicScope.addEventListener(this);
}
再回到publish中,假设前面客户端的请求中参数mode是MODE_LIVE,因此调用ClientBroadcastStream的start函数和startPublishing函数,下面分别分析。
start函数
start定义在ClientBroadcastStream中,
public void start() {
checkVideoCodec = true;
checkAudioCodec = true;
firstPacketTime = -1;
latestTimeStamp = -1;
bytesReceived = 0;
IConsumerService consumerManager = (IConsumerService) getScope().getContext().getBean(IConsumerService.KEY);
connMsgOut = consumerManager.getConsumerOutput(this);
if (connMsgOut != null && connMsgOut.subscribe(this, null)) {
setCodecInfo(new StreamCodecInfo());
creationTime = System.currentTimeMillis();
closed = false;
} else {
}
}
在进行一些成员变量的初始化后,start函数通过Spring获得ConsumerService,并调用其getConsumerOutput函数创建另一个管道并作相应的设置,
public IMessageOutput getConsumerOutput(IClientStream stream) {
IStreamCapableConnection streamConn = stream.getConnection();
if (streamConn != null && streamConn instanceof RTMPConnection) {
RTMPConnection conn = (RTMPConnection) streamConn;
OutputStream o = conn.createOutputStream(stream.getStreamId());
IPipe pipe = new InMemoryPushPushPipe();
pipe.subscribe(new ConnectionConsumer(conn, o.getVideo(), o.getAudio(), o.getData()), null);
return pipe;
}
return null;
}
getConsumerOutput首先通过ClientBroadcastStream获得RTMPMinaConnection连接,并执行其createOutputStream创建输出流,该函数前面的章节已经分析过了,这里再看一次,
public OutputStream createOutputStream(Number streamId) {
int channelId = getChannelIdForStreamId(streamId);
final Channel data = getChannel(channelId++);
final Channel video = getChannel(channelId++);
final Channel audio = getChannel(channelId++);
return new OutputStream(video, audio, data);
}
createOutputStream通过获得数据、音频、视频的Channel构造了一个OutputStream并返回。
回到getConsumerOutput函数中,接下来构造了一个ConnectionConsumer,然后创建了另一个管道InMemoryPushPushPipe并执行该管道的subscribe函数,
public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
if (consumer instanceof IPushableConsumer) {
boolean success = super.subscribe(consumer, paramMap);
if (success) {
fireConsumerConnectionEvent(consumer, PipeConnectionEvent.CONSUMER_CONNECT_PUSH, paramMap);
}
return success;
} else {
}
}
前面已经分析了一遍subscribe函数,但是这里传入的参数不是IProvider,而是IConsumer,继续看其父类的subscribe函数,
public boolean subscribe(IConsumer consumer, Map<String, Object> paramMap) {
boolean success = consumers.addIfAbsent(consumer);
if (success && consumer instanceof IPipeConnectionListener) {
listeners.addIfAbsent((IPipeConnectionListener) consumer);
}
return success;
}
该函数只是添加了Consumer以及监听器。
继续回到子类InMemoryPushPushPipe的subscribe函数中,假设执行成功,就调用fireConsumerConnectionEvent触发监听事件了,
protected void fireConsumerConnectionEvent(IConsumer consumer, int type, Map<String, Object> paramMap) {
PipeConnectionEvent event = new PipeConnectionEvent(this);
event.setConsumer(consumer);
event.setType(type);
event.setParamMap(paramMap);
firePipeConnectionEvent(event);
}
这里依然构造了PipeConnectionEvent事件,并调用firePipeConnectionEvent开始处理,
protected void firePipeConnectionEvent(PipeConnectionEvent event) {
for (IPipeConnectionListener element : listeners) {
try {
element.onPipeConnectionEvent(event);
} catch (Throwable t) {
}
}
if (taskExecutor == null) {
taskExecutor = Executors.newCachedThreadPool(new CustomizableThreadFactory("Pipe-"));
}
for (Runnable task : event.getTaskList()) {
try {
taskExecutor.execute(task);
} catch (Throwable t) {
}
}
event.getTaskList().clear();
}
首先通过onPipeConnectionEvent触发监听器事件,接着启动线程处理各个task。这里的onPipeConnectionEvent定义在ClientBroadcastStream中,
public void onPipeConnectionEvent(PipeConnectionEvent event) {
switch (event.getType()) {
case PipeConnectionEvent.PROVIDER_CONNECT_PUSH:
...
break;
case PipeConnectionEvent.PROVIDER_DISCONNECT:
...
break;
case PipeConnectionEvent.CONSUMER_CONNECT_PUSH:
IPipe pipe = (IPipe) event.getSource();
if (this.livePipe == pipe) {
notifyChunkSize();
}
subscriberStats.increment();
break;
case PipeConnectionEvent.CONSUMER_DISCONNECT:
...
break;
default:
}
}
和前面不同的是,这里触发的是PipeConnectionEvent.CONSUMER_CONNECT_PUSH事件,但其实什么也没做。
值得注意的subscribe函数会回调刚刚创建的ConnectionConsumer的onPipeConnectionEvent函数,但其实什么都没做,代码如下,
public void onPipeConnectionEvent(PipeConnectionEvent event) {
switch (event.getType()) {
case PipeConnectionEvent.PROVIDER_DISCONNECT:
closeChannels();
break;
default:
}
}
回到ClientBroadcastStream的start函数中,返回的InMemoryPushPushPipe保存在connMsgOut中,接下来又执行了一遍subscribe,继续往InMemoryPushPushPipe注册Consumer,这里的回调函数就定义在ClientBroadcastStream中,刚刚也已经分析过了。假设注册成功,接下来就创建一个StreamCodecInfo并进行相应的设置,后面的章节会分析到。
startPublishing函数
startPublishing定义在ClientBroadcastStream中,代码如下,
public void startPublishing() {
sendStartNotifications(Red5.getConnectionLocal());
if (automaticRecording) {
try {
saveAs(publishedName, false);
} catch (Exception e) {
}
}
}
首先通过sendStartNotifications发送流即将开始的通知,代码如下,
private void sendStartNotifications(IEventListener source) {
if (sendStartNotification) {
sendStartNotification = false;
if (source instanceof IConnection) {
IScope scope = ((IConnection) source).getScope();
if (scope.hasHandler()) {
final Object handler = scope.getHandler();
if (handler instanceof IStreamAwareScopeHandler) {
if (recordingListener != null && recordingListener.get().isRecording()) {
((IStreamAwareScopeHandler) handler).streamRecordStart(this);
} else {
try {
File file = getRecordFile(scope, publishedName);
if (file != null && file.exists()) {
if (!file.delete()) {
}
}
} catch (Exception e) {
}
((IStreamAwareScopeHandler) handler).streamPublishStart(this);
}
}
}
}
sendPublishStartNotify();
if (recordingListener != null && recordingListener.get().isRecording()) {
sendRecordStartNotify();
}
notifyBroadcastStart();
}
}
sendStartNotification在ClientBroadcastStream创建时就默认为true,因为sendStartNotifications只执行一次,因此进入后首先将sendStartNotification设置为false。
传入的参数source实际上就是RTMPMinaConnection,通过getScope获得其对应的Scope后,再通过getHandler获得对应的handler,实际上就是CoreHandler,并没有实现IStreamAwareScopeHandler接口。
再往下通过sendPublishStartNotify向管道推送消息,
private void sendPublishStartNotify() {
Status publishStatus = new Status(StatusCodes.NS_PUBLISH_START);
publishStatus.setClientid(getStreamId());
publishStatus.setDetails(getPublishedName());
StatusMessage startMsg = new StatusMessage();
startMsg.setBody(publishStatus);
pushMessage(startMsg);
}
这里主要创建了状态为StatusCodes.NS_PUBLISH_START的Status,包装为消息后,然后通过pushMessage发送该消息,
protected void pushMessage(StatusMessage msg) {
if (connMsgOut != null) {
try {
connMsgOut.pushMessage(msg);
} catch (IOException err) {
}
} else {
}
}
connMsgOut就是前面在ClientBroadcastStream中创建的InMemoryPushPushPipe,来看它的pushMessage函数,
public void pushMessage(IMessage message) throws IOException {
for (IConsumer consumer : consumers) {
try {
IPushableConsumer pcon = (IPushableConsumer) consumer;
if (message instanceof RTMPMessage) {
RTMPMessage rtmpMessage = (RTMPMessage) message;
IRTMPEvent body = rtmpMessage.getBody();
int time = body.getTimestamp();
pcon.pushMessage(this, message);
body.setTimestamp(time);
} else {
pcon.pushMessage(this, message);
}
} catch (Throwable t) {
}
}
}
刚函数获取该管道注册过的IConsumer,跟着前面的分析,这里传入的message并没有继承或实现RTMPMessage,因此直接调用每个IConsumer的pushMessage函数。
根据本章前面的分析结果,之前一共注册了两个IConsumer,一个是在ClientBroadcastStream的getConsumerOutput函数中创建的ConnectionConsumer,另一个也是在ClientBroadcastStream的start函数中注册的自身,因为ClientBroadcastStream实现的IPushableConsumer接口继承自IConsumer,下面分别来看它们的pushMessage函数,
ConnectionConsumer的pushMessage函数的代码如下,
public void pushMessage(IPipe pipe, IMessage message) {
if (message instanceof ResetMessage) {
} else if (message instanceof StatusMessage) {
StatusMessage statusMsg = (StatusMessage) message;
data.sendStatus(statusMsg.getBody());
} else if (message instanceof RTMPMessage) {
...
} else {
}
}
因为传入的message就是StatusMessage,因此这里通过ConnectionConsumer的数据Channel(data)发送出去了,sendStatus定义在Channel中,代码如下,
public void sendStatus(Status status) {
if (connection != null) {
final boolean andReturn = !status.getCode().equals(StatusCodes.NS_DATA_START);
final Invoke event = new Invoke();
if (andReturn) {
final PendingCall call = new PendingCall(null, CALL_ON_STATUS, new Object[] { status });
if (status.getCode().equals(StatusCodes.NS_PLAY_START)) {
...
}
event.setCall(call);
} else {
...
}
write(event, connection.getStreamIdForChannelId(id));
}
}
根据前面的分析,传入的statusCode是NS_PUBLISH_START,因此andReturn为true,sendStatus设置PendingCall后,就直接调用write将数据发送给客户端了。
ClientBroadcastStream的pushMessage函数为空。
回到sendStartNotifications函数中,接下来通过notifyBroadcastStart触发别的监听器,但其实什么也没做。
再回到startPublishing函数中,接下来调用saveAs继续处理,
public void saveAs(String name, boolean isAppend) throws IOException {
IStreamCapableConnection conn = getConnection();
if (recordingListener == null) {
IRecordingListener listener = new RecordingListener();
if (listener.init(conn, name, isAppend)) {
IStreamCodecInfo codecInfo = getCodecInfo();
if (codecInfo instanceof StreamCodecInfo) {
StreamCodecInfo info = (StreamCodecInfo) codecInfo;
IVideoStreamCodec videoCodec = info.getVideoCodec();
if (videoCodec != null) {
IoBuffer config = videoCodec.getDecoderConfiguration();
if (config != null) {
VideoData videoConf = new VideoData(config.asReadOnlyBuffer());
try { listener.getFileConsumer().setVideoDecoderConfiguration(videoConf);
} finally {
videoConf.release();
}
}
} else {
}
IAudioStreamCodec audioCodec = info.getAudioCodec();
if (audioCodec != null) {
IoBuffer config = audioCodec.getDecoderConfiguration();
if (config != null) {
AudioData audioConf = new AudioData(config.asReadOnlyBuffer());
try { listener.getFileConsumer().setAudioDecoderConfiguration(audioConf);
} finally {
audioConf.release();
}
}
} else {
}
}
recordingListener = new WeakReference<IRecordingListener>(listener);
addStreamListener(listener);
listener.start();
} else {
}
} else {
}
}
saveAs函数首先创建了一个RecordingListener,并调用其init函数进行初始化,init函数的代码如下,
public boolean init(IConnection conn, String name, boolean isAppend) {
return init(conn.getScope(), name, isAppend);
}
public boolean init(IScope scope, String name, boolean isAppend) {
File file = getRecordFile(scope, name);
if (file != null) {
if (!isAppend) {
if (file.exists()) {
if (!file.delete()) {
return false;
}
}
} else {
if (file.exists()) {
appending = true;
} else {
isAppend = false;
}
}
if (!file.exists()) {
String path = file.getAbsolutePath();
int slashPos = path.lastIndexOf(File.separator);
if (slashPos != -1) {
path = path.substring(0, slashPos);
}
File tmp = new File(path);
if (!tmp.isDirectory()) {
tmp.mkdirs();
}
try {
file.createNewFile();
} catch (IOException e) {
return false;
}
}
if (scope.getContext().hasBean("keyframe.cache")) {
IKeyFrameMetaCache keyFrameCache = (IKeyFrameMetaCache) scope.getContext().getBean("keyframe.cache");
keyFrameCache.removeKeyFrameMeta(file);
}
if (scope.getContext().hasBean("fileConsumer")) {
recordingConsumer = (FileConsumer) scope.getContext().getBean("fileConsumer");
recordingConsumer.setScope(scope);
recordingConsumer.setFile(file);
} else {
recordingConsumer = new FileConsumer(scope, file);
}
if (isAppend) {
recordingConsumer.setMode("append");
} else {
recordingConsumer.setMode("record");
}
setFileName(file.getName());
scheduler = (QuartzSchedulingService) scope.getParent().getContext().getBean(QuartzSchedulingService.BEAN_NAME);
recording.set(true);
} else {
}
return recording.get();
}
init函数首先根据Scope和name通过getRecordFile函数获取文件,代码如下,
public static File getRecordFile(IScope scope, String name) {
IStreamFilenameGenerator generator = (IStreamFilenameGenerator) ScopeUtils.getScopeService(scope, IStreamFilenameGenerator.class, DefaultStreamFilenameGenerator.class);
String fileName = generator.generateFilename(scope, name, ".flv", GenerationType.RECORD);
File file = null;
if (generator.resolvesToAbsolutePath()) {
file = new File(fileName);
} else {
Resource resource = scope.getContext().getResource(fileName);
if (resource.exists()) {
try {
file = resource.getFile();
} catch (IOException ioe) {
}
} else {
String appScopeName = ScopeUtils.findApplication(scope).getName();
file = new File(String.format("%s/webapps/%s/%s", System.getProperty("red5.root"), appScopeName, fileName));
}
}
return file;
}
getRecordFile函数首先通过ScopeUtils的getScopeService函数构造一个DefaultStreamFilenameGenerator,并通过其generateFilename函数生成文件名,
public String generateFilename(IScope scope, String name, GenerationType type) {
return generateFilename(scope, name, null, type);
}
public String generateFilename(IScope scope, String name, String extension, GenerationType type) {
String result = getStreamDirectory(scope) + name;
if (extension != null && !extension.equals("")) {
result += extension;
}
return result;
}
getStreamDirectory获取Scope对应的路径名,因此最后的文件名为”Scope路径名+name+extension”。
回到getRecordFile函数中,resolvesToAbsolutePath函数默认返回false,因此直接看接下来的else部分,因为是第一次创建,因此getResource返回的Resource不存在,因此最后会创建一个File,用来存储流的数据。
回到RecordingListener的init函数中,传入的参数isAppend为false,因此这里会删除同名File,紧接下来就会通过createNewFile创建该File。
再往下,init函数会根据red5-common.xml配置文件创建CachingFileKeyFrameMetaCache,并
public void removeKeyFrameMeta(File file) {
rwLock.writeLock().lock();
try {
String canonicalPath = file.getCanonicalPath();
inMemoryMetaCache.remove(canonicalPath);
} catch (IOException e) {
} finally {
rwLock.writeLock().unlock();
}
super.removeKeyFrameMeta(file);
}
public void removeKeyFrameMeta(File file) {
String filename = String.format("%s.meta", file.getAbsolutePath());
File metadataFile = new File(filename);
if (metadataFile.exists()) {
if (metadataFile.delete()) {
} else {
metadataFile.deleteOnExit();
}
} else {
}
}
这里其实就是删除流文件对应的信息文件。
回到init函数中,再往下创建了一个FileConsumer并设置其模式为record,最后返回true。
回到saveAs函数中,RecordingListener初始化完成后,首先通过getCodecInfo获得前面在ClientBroadcastStream的start函数中创建的StreamCodecInfo,但是接下来的getVideoCodec和getAudioCodec返回空,所以直接跳到最后看,首先通过addStreamListener将刚刚创建的RecordingListener函数设置进ClientBroadcastStream的listeners中,
public void addStreamListener(IStreamListener listener) {
listeners.add(listener);
}
接下来就调用RecordingListener的start函数了,
public void start() {
eqj = new EventQueueJob();
eventQueueJobName = scheduler.addScheduledJob(3000, eqj);
}
start函数很简单,就是启用一个线程执行EventQueueJob,用来处理接收到的事件。
下一章开始分析客户端如何发送流给服务器。