red5源码分析—服务器处理createStream命令
服务器接到createStream命令后,经过过滤器层层处理,最后会调用BaseRTMPHandler的messageReceived函数,
public void messageReceived(RTMPConnection conn, Packet packet) throws Exception {
if (conn != null) {
IRTMPEvent message = null;
try {
message = packet.getMessage();
final Header header = packet.getHeader();
final Number streamId = header.getStreamId();
final Channel channel = conn.getChannel(header.getChannelId());
final IClientStream stream = conn.getStreamById(streamId);
conn.setStreamId(streamId);
conn.messageReceived();
message.setSource(conn);
final byte headerDataType = header.getDataType();
switch (headerDataType) {
...
case TYPE_INVOKE:
case TYPE_FLEX_MESSAGE:
onCommand(conn, channel, header, (Invoke) message);
IPendingServiceCall call = ((Invoke) message).getCall();
if (message.getHeader().getStreamId().intValue() != 0 && call.getServiceName() == null && StreamAction.PUBLISH.equals(call.getServiceMethodName())) {
if (stream != null) {
((IEventDispatcher) stream).dispatchEvent(message);
}
}
break;
...
default:
}
if (message instanceof Unknown) {
}
} catch (Throwable t) {
}
...
}
}
createStream命令的消息属于TYPE_INVOKE消息,最后会调用onCommand函数,onCommand定义在RTMPHandler中,在《red5源码分析—5》中已经介绍了一部分OnCommand的代码,现在看另一部分,
protected void onCommand(RTMPConnection conn, Channel channel, Header source, ICommand command) {
final IServiceCall call = command.getCall();
final String action = call.getServiceMethodName();
if ("_result".equals(action) || "_error".equals(action)) {
handlePendingCallResult(conn, (Invoke) command);
return;
}
boolean disconnectOnReturn = false;
boolean connected = conn.isConnected();
if (connected) {
if (call.getServiceName() == null) {
StreamAction streamAction = StreamAction.getEnum(action);
if (dispatchStreamActions) {
try {
conn.getScope().getHandler().handleEvent(new StreamActionEvent(streamAction));
} catch (Exception ex) {
}
}
switch (streamAction) {
case DISCONNECT:
conn.close();
break;
case CREATE_STREAM:
case INIT_STREAM:
case CLOSE_STREAM:
case RELEASE_STREAM:
case DELETE_STREAM:
case PUBLISH:
case PLAY:
case PLAY2:
case SEEK:
case PAUSE:
case PAUSE_RAW:
case RECEIVE_VIDEO:
case RECEIVE_AUDIO:
IStreamService streamService = (IStreamService) ScopeUtils.getScopeService(conn.getScope(), IStreamService.class, StreamService.class);
try {
if (invokeCall(conn, call, streamService)) {
} else {
}
} catch (Throwable err) {
...
}
break;
default:
invokeCall(conn, call);
}
} else {
invokeCall(conn, call);
}
} else {
...
}
if (command instanceof Invoke) {
if ((source.getStreamId().intValue() != 0) && (call.getStatus() == Call.STATUS_SUCCESS_VOID || call.getStatus() == Call.STATUS_SUCCESS_NULL)) {
return;
}
boolean sendResult = true;
if (call instanceof IPendingServiceCall) {
IPendingServiceCall psc = (IPendingServiceCall) call;
Object result = psc.getResult();
if (result instanceof DeferredResult) {
DeferredResult dr = (DeferredResult) result;
dr.setServiceCall(psc);
dr.setChannel(channel);
dr.setTransactionId(command.getTransactionId());
conn.registerDeferredResult(dr);
sendResult = false;
}
}
if (sendResult) {
Invoke reply = new Invoke();
reply.setCall(call);
reply.setTransactionId(command.getTransactionId());
channel.write(reply);
if (disconnectOnReturn) {
conn.close();
}
}
}
}
OnCommand的前后两部分代码都在《red5源码分析—5》中看过了,不同之处是这里已经与某个服务器的Scope连接了,因此isConnected返回true。再往下就是调用ScopeUtils的getScopeService获得StreamService,getScopeService的定义如下
public static Object getScopeService(IScope scope, Class<?> intf, Class<?> defaultClass, boolean checkHandler) {
String attr = IPersistable.TRANSIENT_PREFIX + SERVICE_CACHE_PREFIX + intf.getCanonicalName();
if (scope.hasAttribute(attr)) {
return scope.getAttribute(attr);
}
Object handler = null;
if (checkHandler) {
IScope current = scope;
while (current != null) {
IScopeHandler scopeHandler = current.getHandler();
if (intf.isInstance(scopeHandler)) {
handler = scopeHandler;
break;
}
if (!current.hasParent()) {
break;
}
current = current.getParent();
}
}
if (handler == null && IScopeService.class.isAssignableFrom(intf)) {
Field key = null;
Object serviceName = null;
try {
key = intf.getField("BEAN_NAME");
serviceName = key.get(null);
if (serviceName instanceof String) {
handler = getScopeService(scope, (String) serviceName, defaultClass);
}
} catch (Exception e) {
}
}
if (handler == null && defaultClass != null) {
try {
handler = defaultClass.newInstance();
} catch (Exception e) {
}
}
scope.setAttribute(attr, handler);
return handler;
}
首先这里会尝试从Scope的缓存中获取已经实例化的StreamService,如果不行就从该Scope开始一直向其父类寻找其中的handler是不是(根据以前的章节可知道,这时这里基本获得的handler是CoreHandler),再往下就是查看BEAN_NAME,并尝试通过Spring获得或初始化该类,如果还不行,最后就直接通过Java实例化了。
回到onCommand中,假设获得了StreamService,下面就通过invokeCall开始处理了,
private boolean invokeCall(RTMPConnection conn, IServiceCall call, Object service) {
final IScope scope = conn.getScope();
final IContext context = scope.getContext();
return context.getServiceInvoker().invoke(call, service);
}
根据前面几章的分析,这里获得的context为org.red5.server.Context,根据red5-default.xml的配置,getServiceInvoker返回org.red5.server.service.ServiceInvoker,因此下面来看它的invoke函数,
public boolean invoke(IServiceCall call, Object service) {
IConnection conn = Red5.getConnectionLocal();
String methodName = call.getServiceMethodName();
if (methodName.charAt(0) == ‘@‘) {
methodName = methodName.substring(1);
}
Object[] args = call.getArguments();
Object[] argsWithConnection;
if (args != null) {
argsWithConnection = new Object[args.length + 1];
argsWithConnection[0] = conn;
for (int i = 0; i < args.length; i++) {
argsWithConnection[i + 1] = args[i];
}
} else {
argsWithConnection = new Object[] { conn };
}
Object[] methodResult = null;
methodResult = ReflectionUtils.findMethodWithExactParameters(service, methodName, argsWithConnection);
if (methodResult.length == 0 || methodResult[0] == null) {
methodResult = ReflectionUtils.findMethodWithExactParameters(service, methodName, args);
if (methodResult.length == 0 || methodResult[0] == null) {
methodResult = ReflectionUtils.findMethodWithListParameters(service, methodName, argsWithConnection);
if (methodResult.length == 0 || methodResult[0] == null) {
methodResult = ReflectionUtils.findMethodWithListParameters(service, methodName, args);
if (methodResult.length == 0 || methodResult[0] == null) {
call.setStatus(Call.STATUS_METHOD_NOT_FOUND);
if (args != null && args.length > 0) {
call.setException(new MethodNotFoundException(methodName, args));
} else {
call.setException(new MethodNotFoundException(methodName));
}
return false;
}
}
}
}
Object result = null;
Method method = (Method) methodResult[0];
Object[] params = (Object[]) methodResult[1];
try {
if (method.isAnnotationPresent(DeclarePrivate.class)) {
throw new NotAllowedException("Access denied, method is private");
}
final DeclareProtected annotation = method.getAnnotation(DeclareProtected.class);
if (annotation != null) {
if (!conn.getClient().hasPermission(conn, annotation.permission())) {
throw new NotAllowedException("Access denied, method is protected");
}
}
if (method.getReturnType().equals(Void.TYPE)) {
method.invoke(service, params);
call.setStatus(Call.STATUS_SUCCESS_VOID);
} else {
result = method.invoke(service, params);
call.setStatus(result == null ? Call.STATUS_SUCCESS_NULL : Call.STATUS_SUCCESS_RESULT);
}
if (call instanceof IPendingServiceCall) {
((IPendingServiceCall) call).setResult(result);
}
} catch (Exception e) {
...
return false;
}
return true;
}
这里为方便阅读,对异常部分作了改写。这段代码虽然稍微长点,但是其实做的事情很简单,就是从StreamService中获取方法,并调用该方法,具体的代码可以自己分析了,因为不是只找一次,是一次一次放宽了条件来找,找到方法后,就调用其invoke执行方法,并处理返回值。接下来就是调用StreamService的createStream方法,
public Number createStream() {
IConnection conn = Red5.getConnectionLocal();
if (conn instanceof IStreamCapableConnection) {
try {
Number streamId = ((IStreamCapableConnection) conn).reserveStreamId();
}
return streamId;
} catch (IndexOutOfBoundsException e) {
return -1;
}
}
return -1;
}
这里会简单调用reserveStreamId获得一个随机的streamId并返回,
public Number reserveStreamId() {
double d = 1.0d;
for (; d < MAX_RESERVED_STREAMS; d++) {
if (reservedStreams.add(d)) {
break;
}
}
if (d == MAX_RESERVED_STREAMS) {
throw new IndexOutOfBoundsException("Unable to reserve new stream");
}
return d;
}
回到前面invoke方法中,最后Call的result里保存了刚刚分配的流的ID,并且状态为STATUS_SUCCESS_RESULT,然后返回给客户端。从这里也可以看出,createStream只负责分配一个ID,没有任何文件的创建、Socket的创建等等的功能。