一、说明
此客户端使用python3编写
此客户端实现RTSP的OPTIONS, DESCRIBE, SETUP , PLAY, GET_PARAMETER,TEARDOWN方法,未实现SET_PARAMETER方法(因为不懂怎么才能生成这个请求没拦截到数据包)
RTSP有且只有以上7种请求,而且基本是对同一URI依次执行以上7种请求;或者说RTSP就是“7种请求”+“1个URL”,如果想做渗透测试也就针对这“7+1”做参数溢出测试,没很多可测的
此客户端自动依次执行请求:OPTIONS--DESCRIBE(2次)--SETUP(2次)--PLAY--GET_PARAMETER(5次)--TEARDOWN
RTSP有Basic和Digest两种验证方法,此客户端实现Digest方法
总体执行效果如下图所示
二、客户端源代码
import socket import hashlib import time global var_dict var_dict = { ‘server_username‘: ‘admin‘, ‘server_password‘: ‘admin‘, ‘server_ip‘: ‘192.168.220.128‘, ‘server_port‘: 554, ‘server_url‘: ‘/chIP=1&streamType=main/‘, ‘cseq‘: 2, ‘user_agent‘: ‘LibVLC/3.0.2 (LIVE555 Streaming Media v2016.11.28)‘, ‘buffer_len‘: 1024 } def gen_response_value(url,public_method,realm,nonce): frist_pre_md5_value = hashlib.md5((var_dict[‘server_username‘] + ‘:‘ + realm + ‘:‘ + var_dict[‘server_password‘]).encode()).hexdigest() first_post_md5_value = hashlib.md5((public_method+‘:‘ + url).encode()).hexdigest() response_value = hashlib.md5((frist_pre_md5_value + ‘:‘ + nonce + ‘:‘ + first_post_md5_value).encode()).hexdigest() return response_value def gen_options_header(): global var_dict str_options_header = ‘OPTIONS rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[ ‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_options_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘]) + ‘\r\n‘ str_options_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_options_header += ‘\r\n‘ return str_options_header def gen_describe_header(): global var_dict str_describe_header = ‘DESCRIBE rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[ ‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_describe_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 1) + ‘\r\n‘ str_describe_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_describe_header += ‘Accept: application/sdp\r\n‘ str_describe_header += ‘\r\n‘ return str_describe_header def gen_describe_auth_header(url,realm,nonce): global var_dict public_method = ‘DESCRIBE‘ response_value = gen_response_value(url,public_method,realm,nonce) str_describe_auth_header = ‘DESCRIBE rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_describe_auth_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 2) + ‘\r\n‘ str_describe_auth_header += ‘Authorization: Digest username="‘+var_dict[‘server_username‘]+‘", realm="‘+realm+‘", nonce="‘+nonce+‘", uri="‘+url+‘", response="‘+response_value+‘"\r\n‘ str_describe_auth_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_describe_auth_header += ‘Accept: application/sdp\r\n‘ str_describe_auth_header += ‘\r\n‘ return str_describe_auth_header def gen_setup_header(url, realm, nonce): global var_dict public_method = ‘SETUP‘ response_value = gen_response_value(url, public_method, realm, nonce) str_setup_header = ‘SETUP rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[‘server_url‘] + ‘/trackID=0 RTSP/1.0\r\n‘ str_setup_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 3) + ‘\r\n‘ str_setup_header += ‘Authorization: Digest username="‘+var_dict[‘server_username‘]+‘", realm="‘+realm+‘", nonce="‘+nonce+‘", uri="‘+url+‘", response="‘+response_value+‘"\r\n‘ str_setup_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_setup_header += ‘Transport: RTP/AVP;unicast;client_port=50166-50167\r\n‘ str_setup_header += ‘\r\n‘ return str_setup_header def gen_setup_session_header(url, realm, nonce,session): global var_dict public_method = ‘SETUP‘ response_value = gen_response_value(url, public_method, realm, nonce) str_setup_session_header = ‘SETUP rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[ ‘server_url‘] + ‘/trackID=1 RTSP/1.0\r\n‘ str_setup_session_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 4) + ‘\r\n‘ str_setup_session_header += ‘Authorization: Digest username="‘+var_dict[‘server_username‘]+‘", realm="‘+realm+‘", nonce="‘+nonce+‘", uri="‘+url+‘", response="‘+response_value+‘"\r\n‘ str_setup_session_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_setup_session_header += ‘Transport: RTP/AVP;unicast;client_port=50168-50169\r\n‘ str_setup_session_header += ‘Session: ‘+session+‘\r\n‘ str_setup_session_header += ‘\r\n‘ return str_setup_session_header def gen_play_header(url, realm, nonce,session): global var_dict public_method = ‘PLAY‘ response_value = gen_response_value(url, public_method, realm, nonce) str_play_header = ‘PLAY rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[ ‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_play_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 5) + ‘\r\n‘ str_play_header += ‘Authorization: Digest username="‘+var_dict[‘server_username‘]+‘", realm="‘+realm+‘", nonce="‘+nonce+‘", uri="‘+url+‘", response="‘+response_value+‘"\r\n‘ str_play_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_play_header += ‘Session: ‘+session+‘\r\n‘ str_play_header += ‘Range: npt=0.000-\r\n‘ str_play_header += ‘\r\n‘ return str_play_header def gen_get_parameter_header(url, realm, nonce, session,count): global var_dict public_method = ‘GET_PARAMETER‘ response_value = gen_response_value(url, public_method, realm, nonce) str_get_parameter_header = ‘GET_PARAMETER rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[ ‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_get_parameter_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 6+int(count)) + ‘\r\n‘ str_get_parameter_header += ‘Authorization: Digest username="‘ + var_dict[ ‘server_username‘] + ‘", realm="‘ + realm + ‘", nonce="‘ + nonce + ‘", uri="‘ + url + ‘", response="‘ + response_value + ‘"\r\n‘ str_get_parameter_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_get_parameter_header += ‘Session: ‘ + session + ‘\r\n‘ str_get_parameter_header += ‘\r\n‘ return str_get_parameter_header def gen_teardown_header(url, realm, nonce, session): global var_dict public_method = ‘TEARDOWN‘ response_value = gen_response_value(url, public_method, realm, nonce) str_teardown_header = ‘TEARDOWN rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[‘server_url‘] + ‘ RTSP/1.0\r\n‘ str_teardown_header += ‘CSeq: ‘ + str(var_dict[‘cseq‘] + 11) + ‘\r\n‘ str_teardown_header += ‘Authorization: Digest username="‘ + var_dict[ ‘server_username‘] + ‘", realm="‘ + realm + ‘", nonce="‘ + nonce + ‘", uri="‘ + url + ‘", response="‘ + response_value + ‘"\r\n‘ str_teardown_header += ‘User-Agent: ‘ + var_dict[‘user_agent‘] + ‘\r\n‘ str_teardown_header += ‘Session: ‘ + session + ‘\r\n‘ str_teardown_header += ‘\r\n‘ return str_teardown_header socket_send = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socket_send.settimeout(5) socket_send.connect((var_dict[‘server_ip‘], var_dict[‘server_port‘])) url = ‘rtsp://‘ + var_dict[‘server_ip‘] + ‘:‘ + str(var_dict[‘server_port‘]) + var_dict[‘server_url‘] print(‘now start to check options operation‘) str_options_header = gen_options_header() socket_send.send(str_options_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if ‘200 OK‘ in msg_recv: print(‘OPTIONS request is OK‘) else: print(‘OPTIONS request is BAD‘) str_describe_header = gen_describe_header() socket_send.send(str_describe_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if msg_recv.find(‘401 Unauthorized‘) == -1: msg_recv_dict = msg_recv.split(‘\r\n‘) print(‘first DESCRIBE request occur error: ‘) print(msg_recv_dict[0]) else: print(‘first DESCRIBE is ok,now we will execute second DESCRIBE for auth‘) realm_pos = msg_recv.find(‘realm‘) realm_value_begin_pos = msg_recv.find(‘"‘, realm_pos)+1 realm_value_end_pos = msg_recv.find(‘"‘, realm_pos + 8) realm_value = msg_recv[realm_value_begin_pos:realm_value_end_pos] nonce_pos = msg_recv.find(‘nonce‘) nonce_value_begin_pos = msg_recv.find(‘"‘, nonce_pos)+1 nonce_value_end_pos = msg_recv.find(‘"‘, nonce_pos + 8) nonce_value = msg_recv[nonce_value_begin_pos:nonce_value_end_pos] str_describe_header = gen_describe_auth_header(url, realm_value, nonce_value) socket_send.send(str_describe_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if msg_recv.find(‘200 OK‘) == -1: msg_recv_dict = msg_recv.split(‘\r\n‘) print(‘second DESCRIBE request occur error: ‘) print(msg_recv_dict[0]) else: print(‘second DESCRIBE is ok,now we will execute first SETUP for session‘) str_setup_header = gen_setup_header(url, realm_value, nonce_value) socket_send.send(str_setup_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if msg_recv.find(‘200 OK‘) == -1: msg_recv_dict = msg_recv.split(‘\r\n‘) print(‘first SETUP request occur error: ‘) print(msg_recv_dict[0]) else: print(‘first SETUP is ok,now we will execute second SETUP‘) session_pos = msg_recv.find(‘Session‘) session_value_begin_pos = msg_recv.find(‘ ‘,session_pos+8)+1 session_value_end_pos = msg_recv.find(‘;‘,session_pos+8) session_value = msg_recv[session_value_begin_pos:session_value_end_pos] str_setup_session_header = gen_setup_session_header(url, realm_value, nonce_value,session_value) socket_send.send(str_setup_session_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if msg_recv.find(‘200 OK‘) == -1: msg_recv_dict = msg_recv.split(‘\r\n‘) print(‘first SETUP request occur error: ‘) print(msg_recv_dict[0]) else: print(‘second SETUP is ok, now we wil execute PLAY‘) str_play_header = gen_play_header(url, realm_value, nonce_value, session_value) socket_send.send(str_play_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() if msg_recv.find(‘200 OK‘) == -1: msg_recv_dict = msg_recv.split(‘\r\n‘) print(‘PLAY request occur error: ‘) print(msg_recv_dict[0]) else: print(‘PLAY is ok, we will execute GET_PARAMETER every 10 seconds and 5 times total‘) for i in range(5): str_get_parameter_header = gen_get_parameter_header(url, realm_value, nonce_value, session_value,str(i)) socket_send.send(str_get_parameter_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() msg_recv_dict = msg_recv.split(‘\r\n‘) print(str(i)+‘*10:‘+msg_recv_dict[0]) time.sleep(10) print(‘now we will execute TEARDOWN to disconnect with server‘) str_teardown_header = gen_teardown_header(url, realm_value, nonce_value, session_value) socket_send.send(str_teardown_header.encode()) msg_recv = socket_send.recv(var_dict[‘buffer_len‘]).decode() print(msg_recv) print(‘program execute finished, thank you‘) socket_send.close()
参考:
https://www.cnblogs.com/MikeZhang/archive/2012/10/29/rtspTcpClient_DSS_20121029.html
https://blog.csdn.net/joeblackzqq/article/details/22383005
https://docs.python.org/3/library/index.html
原文地址:https://www.cnblogs.com/lsdb/p/9090198.html
时间: 2024-10-10 10:27:32