red5源码分析—客户端publish流
接着上一章的分析结果,参考《red5源码分析—7》的分析结论,当服务器返回steamId后,客户端会执行BaseRTMPClientHandler的onCommand函数,onCommand函数会根据返回的方法名”_result”开始执行handlePendingCallResult函数,handlePendingCallResult会获取之前注册的回调函数,根据《red5源码分析—7》,该回调函数就为CreateStreamCallBack,获取回调函数后,就会执行该回调函数的resultReceived函数,下面来看,
public void resultReceived(IPendingServiceCall call) {
Number streamId = (Number) call.getResult();
if (conn != null && streamId != null) {
NetStream stream = new NetStream(streamEventDispatcher);
stream.setConnection(conn);
stream.setStreamId(streamId);
conn.addClientStream(stream);
NetStreamPrivateData streamData = new NetStreamPrivateData();
streamData.outputStream = conn.createOutputStream(streamId);
streamData.connConsumer = new ConnectionConsumer(conn, streamData.outputStream.getVideo(), streamData.outputStream.getAudio(), streamData.outputStream.getData());
streamDataMap.put(streamId, streamData);
}
wrapped.resultReceived(call);
}
这里首先取出服务器分配的streamId,然后构造一个NetStream并进行相应的设置,NetStream用来处理后面有关流的事件,后面会分析到。接下来通过addClientStream向RTMPMinaConnection设置刚刚创建的NetStream,定义在其父类RTMPConnection中,
public void addClientStream(IClientStream stream) {
if (reservedStreams.add(stream.getStreamId().doubleValue())) {
registerStream(stream);
} else {
}
}
addClientStream首先向reservedStreams注册streamId,然后通过registerStream注册,
private boolean registerStream(IClientStream stream) {
if (streams.putIfAbsent(stream.getStreamId().doubleValue(), stream) == null) {
usedStreams.incrementAndGet();
return true;
}
return false;
}
这里就是根据streamId注册该stream。
回到resultReceived中,接下来构造NetStreamPrivateData,接着通过createOutputStream函数创建输出流,
public OutputStream createOutputStream(Number streamId) {
int channelId = getChannelIdForStreamId(streamId);
final Channel data = getChannel(channelId++);
final Channel video = getChannel(channelId++);
final Channel audio = getChannel(channelId++);
return new OutputStream(video, audio, data);
}
这里就是构造三个Channel,分别用来发送数据、音频和视频,然后构造OutputStream。
再回到resultReceived中,接下来创建ConnectionConsumer,并通过streamId设置进streamDataMap中,最后执行另一个回调函数resultReceived,
public void resultReceived(IPendingServiceCall call) {
Object result = call.getResult();
if (result instanceof ObjectMap) {
if ("connect".equals(call.getServiceMethodName())) {
createStream(this);
}
} else {
if ("createStream".equals(call.getServiceMethodName())) {
if (result instanceof Integer) {
Integer streamIdInt = (Integer) result;
int streamId = streamIdInt.intValue();
publish(streamId, "testgio2", "live", this);
//invoke("getRoomsInfo", this);
} else {
disconnect();
}
} else if ("getRoomsInfo".equals(call.getServiceMethodName())) {
ArrayList<String> list = (ArrayList<String>) result;
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
}
}
相比前几章,这里对resultReceived的createStream对应的方法部分作了相应的改写,注释了”getRoomsInfo”部分。
publish定义在BaseRTMPClientHandler中,
public void publish(Number streamId, String name, String mode, INetStreamEventHandler handler) {
if (handler != null) {
NetStreamPrivateData streamData = streamDataMap.get(streamId);
if (streamData != null) {
streamData.handler = handler;
} else {
}
}
final Object[] params = new Object[2];
params[0] = name;
params[1] = mode;
PendingCall pendingCall = new PendingCall("publish", params);
conn.invoke(pendingCall, getChannelForStreamId(streamId));
}
这里首先设置了事件监听器,然后构造PendingCall并发送给服务器了,发送的命令为”publish”,参数分别是name和mode。
下一章开始分析服务器是如何处理publish命令的。