简明扼要的说下, 就两个线程,一个 负责收数据,一个负责发心跳包。
步骤如下,
进程1,调用 发包函数,发送连接请求,然后再发送 获取弹幕类型请求,就一直循环接收数据。
进程2,循环函数,每隔45秒向服务器发一次心跳包。
因为斗鱼自己定义了 包头,,所以来在发包之前,先发送包数据。12个字节,
消息头部:消息长度 4字节 +消息类型4字节+加密字段2字节(默认为0)+保留字段2字节(默认为0)
然后就要把要发的内容 加上 “\0”,utf-8 编码后就能发送了
完整的 消息是:包头 + 内容 +”\0”;
上Python代码:
main.py
import socket import time import threading import multiprocessing from barrage_func import * # 导入自定义方法 SERVER_DOMAIN = "openbarrage.douyutv.com" # 弹幕服务器 域名 SERVER_PORT = 8601; # 弹幕服务器 端口 ROOM_ID = 288016; #房间ID global FIX_TAIL #拼接处理后被丢弃的数据,防止弹幕丢失 FIX_TAIL = "" global gl_client #全局socket gl_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def init_socket(): global gl_client host_ip = socket.gethostbyname(SERVER_DOMAIN) gl_client.connect((host_ip, SERVER_PORT)) def sendDate(client,data): data = data + ‘\0‘ #斗鱼独创序列化文本数据,结尾必须为‘\0‘ data_length = length = len(data)+8 #斗鱼协议在尾部加了 消长度4字节,消息类型2字节(689),加密字段1字节,保留字段1字节, code = 689 # 消息类型 # 消息头部:消息长度+消息类型+加密字段(默认为0)+保留字段(默认为0) head = data_length.to_bytes(4, ‘little‘) + data_length.to_bytes(4, ‘little‘) + code.to_bytes(2,‘little‘)+ (0).to_bytes(2,‘little‘) # head = int.to_bytes(data_length, 4, ‘little‘) + int.to_bytes(data_length, 4, ‘little‘) + int.to_bytes(code, 4,‘little‘) client.sendall(head) # 发送头部部分 msg = (data).encode(‘utf-8‘) # 使用utf-8编码 数据部分 client.sendall(bytes(msg)) # 发送数据部分 def getdanmu(client): login = ‘type@=loginreq/roomid@=%s/‘ % ROOM_ID sendDate(client,login) joingroup = ‘type@=joingroup/rid@=%s/gid@=-9999/‘ % ROOM_ID sendDate(client,joingroup) while True: try: part_body = client.recv(1024,socket.MSG_WAITALL) if not part_body: #如果 服务器发送终止连接b‘‘,则终止会话 break msg_str = part_body.decode(encoding="utf-8", errors="ignore") get_type(msg_str) except Exception as e: print("getdanmu未知错误: %s" % e) continue def get_type(msg_str): global FIX_TAIL msg_str = FIX_TAIL + msg_str msg_arr = msg_str.split("type@=") FIX_TAIL = msg_arr.pop() for value in msg_arr: type_temp = value.split("/") if len(type_temp) >= 2: type_name = type_temp[0] if type_name == "chatmsg": chatmsg =BRRAGE_FUC.get_chatmsg(value) #获取弹幕类 print("["+chatmsg.nn+"]: "+chatmsg.txt) # pass elif type_name == "dgb": dgb = BRRAGE_FUC.get_Dbg(value) #获取礼物类 print("感谢[{}] ,赠送的 {} 个 ‘{}‘".format(dgb.nn,int(dgb.gfcnt) * int(dgb.hits),dgb.gfid)) # pass elif type_name == "uenter": uenter=BRRAGE_FUC.get_uenter(value) #获取进入房间类 print("欢迎 ["+ uenter.nn+"] " + "进入直播间") # pass elif type_name == "spbc": spbc = BRRAGE_FUC.get_spbc(value) # 获取房间广播类 print("{} 房间,[{}]赠送给[{}] {} 个 ‘{}‘".format(spbc.drid,spbc.sn,spbc.dn,spbc.gc, spbc.gn)) def keep_alive(client): ‘‘‘ 客户端每隔 45 秒发送心跳信息给弹幕服务器 ‘‘‘ while True: alive_msg = "type@=mrkl/" #新版本 # alive_msg = "type@=keeplive/tick@=%s/" % int(time.time()) #旧版本 sendDate(client,alive_msg) time.sleep(20) if __name__ == ‘__main__‘: init_socket() p1 = multiprocessing.Process(target=getdanmu, args=(gl_client,)) p2 = multiprocessing.Process(target=keep_alive, args=(gl_client,)) p1.start() p2.start()
这里引用了 2个文件,
1个是定义了4个类发言弹幕Brrage_Msg(),赠送礼物Brrage_Dgb(),用户进入房间Brrage_Enter(),广播消息Brrage_Spbc ()
1 个是 写了静态方法BRRAGE_FUC 对上面的类进行 赋值
barrage_func.py
import time from barrage_info import * class BRRAGE_FUC(object): ‘‘‘ 常被调用的静态方法 ‘‘‘ #提取发言弹幕 @staticmethod def get_chatmsg(msg): brrage_msg =Brrage_Msg() #获取当时时间 eg: ‘2019-02-16 18:50:02‘ brrage_msg.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value=key_value_temp.split("@=",1) if len(key_value)==2: if key_value[0]=="rid": brrage_msg.rid =str(key_value[1]) elif key_value[0]=="uid": brrage_msg.uid =str(key_value[1]) elif key_value[0]=="nn": brrage_msg.nn =str(key_value[1]) elif key_value[0]=="txt": brrage_msg.txt=str(key_value[1]) elif key_value[0]=="cid": brrage_msg.cid =str(key_value[1]) elif key_value[0]=="nl": brrage_msg.nl =str(key_value[1]) elif key_value[0]=="level": brrage_msg.level =str(key_value[1]) elif key_value[0]=="bnn": brrage_msg.bnn =str(key_value[1]) elif key_value[0]=="bl": brrage_msg.bl =str(key_value[1]) elif key_value[0]=="brid": brrage_msg.brid =str(key_value[1]) return brrage_msg #提取送礼物弹幕 @staticmethod def get_Dbg(msg): brrage_dgb = Brrage_Dgb() # 获取当时时间 eg: ‘2019-02-16 18:50:02‘ brrage_dgb.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_dgb.rid = key_value[1] elif key_value[0] == "uid": brrage_dgb.uid = key_value[1] elif key_value[0] == "nn": brrage_dgb.nn = key_value[1] elif key_value[0] == "sn": brrage_dgb.sn = key_value[1] elif key_value[0] == "gfid": brrage_dgb.gfid = key_value[1] elif key_value[0] == "gfcnt": brrage_dgb.gfcnt = key_value[1] elif key_value[0] == "hits": brrage_dgb.hits = key_value[1] return brrage_dgb #提取用户进房通知弹幕 @staticmethod def get_uenter(msg): brrage_enter = Brrage_Enter() key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_enter.rid = key_value[1] if key_value[0] == "uid": brrage_enter.uid = key_value[1] if key_value[0] == "nn": brrage_enter.nn = key_value[1] if key_value[0] == "nl": brrage_enter.nl = key_value[1] return brrage_enter #飞机、火箭 广播消息 @staticmethod def get_spbc(msg): brrage_spbc = Brrage_Spbc() key_value_list = msg.split("/") for key_value_temp in key_value_list: key_value = key_value_temp.split("@=",1) if len(key_value) == 2: if key_value[0] == "rid": brrage_spbc.id = key_value[1] if key_value[0] == "drid": brrage_spbc.drid = key_value[1] if key_value[0] == "uid": brrage_spbc.uid = key_value[1] if key_value[0] == "sn": brrage_spbc.sn = key_value[1] if key_value[0] == "dn": brrage_spbc.dn = key_value[1] if key_value[0] == "gn": brrage_spbc.gn = key_value[1] if key_value[0] == "gc": brrage_spbc.gc = key_value[1] if key_value[0] == "gb": brrage_spbc.gb = key_value[1] if key_value[0] == "gfid": brrage_spbc.gfid = key_value[1] return brrage_spbc
barrage_info.py
class Brrage_Base(object): rid = "0" # 房间号 uid = "0" # 用户id nn = "nn" # 用户昵称 time = "0000-00-00 00:00:00" # 时间 class Brrage_Msg(Brrage_Base): """表示为“弹幕”消息,type固定为 chatmsg""" def __init__(self): self.txt="txt" #弹幕文本内容 self.cid="" #弹幕唯一 ID self.nl=0 #贵族等级 self.level =0 #用户等级 self.bnn = "" # 徽章昵称 self.bl = 0 # 徽章等级 self.brid=0 #徽章房间 id class Brrage_Dgb(Brrage_Base): ‘‘‘表示为“赠送礼物”消息,type固定为 dgb ‘‘‘ def __init__(self): self.gfid=0 #礼物 id self.gfcnt =1 #礼物个数:默认值 1 self.hits=1 #礼物连击次数:默认值 1(表示 1 连击) class Brrage_Enter(Brrage_Base): ‘‘‘ 表示为“用户进房通知”消息,type固定为 uenter ‘‘‘ def __init__(self): self.nl = 0 # 贵族等级 class Brrage_Spbc(Brrage_Base): ‘‘‘ 房间内礼物广播,type固定为 spbc‘‘‘ def __init__(self): self.drid = 0 #赠送房间 rid ,默认为0 self.sn = "" # 赠送者昵称 self.dn = "" # 受赠者昵称 self.gn = "" # 礼物名称 self.gc = 1 # 礼物数量 # self.gs = "" # 广播样式 self.gb = 1 # 是否有礼包(0-无礼包,1-有礼包) # self.es = 1 # 广播展现样式(1-火箭,2-飞机) self.gfid = 1 #礼物 id
运行 main.py 看效果,,比 官方弹幕 还 全,,官方应该 对 tcp 粘包没处理好。
下面的是C#的代码,原理都一样
using System; using System.Net; using System.Net.Sockets; using System.Text; using System.Text.RegularExpressions; using System.Threading; namespace danmu { class Program { private static string SERVER_DOMAIN = "openbarrage.douyutv.com"; private static int SERVER_PORT = 8601; private static int ROOM_ID = 288016; private static string FIX_TAIL = String.Empty; //拼接处理后被丢弃的数据,防止弹幕丢失 class BrrageMsg { public string Name = String.Empty; public string Txt = String.Empty; } static void Main(string[] args) { try { Socket tcpClient = InitTcp(SERVER_DOMAIN, SERVER_PORT); Thread getDanmuThread = new Thread(GetDanmu); getDanmuThread.Start(tcpClient); Thread keepAliveThread = new Thread(KeepAlive); keepAliveThread.Start(tcpClient); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } static Socket InitTcp(string host, int port) { IPHostEntry hostInfo = Dns.GetHostEntry(host); IPAddress ipAddress = hostInfo.AddressList[0]; //域名转IP IPEndPoint ipe = new IPEndPoint(ipAddress, port); Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); s.Connect(ipe); return s; } static byte[] DataToBytes(string data) { string dantaNew = data + "\0"; byte[] bodyDataByte = Encoding.UTF8.GetBytes(dantaNew); byte[] cType = BitConverter.GetBytes(689); int dataLength = dantaNew.Length + cType.Length + 8; byte[] dataLengthByte = BitConverter.GetBytes(dataLength); byte[] dataLengthByte2 = BitConverter.GetBytes(dataLength); byte[] result = new byte[dataLength + 4]; Array.Copy(dataLengthByte, 0, result, 0, 4); Array.Copy(dataLengthByte2, 0, result, 4, 4); Array.Copy(cType, 0, result, 8, 4); Array.Copy(bodyDataByte, 0, result, 12, bodyDataByte.Length); byte[] source = new byte[result.Length]; Array.Copy(result, 0, source, 0, result.Length); return result; } static void GetDanmu(object obj) { Socket tcpClient = (Socket)obj; string login = "type@=loginreq/roomid@=" + ROOM_ID + "/"; byte[] loginBytes = DataToBytes(login); tcpClient.Send(loginBytes); string joingroup = "type@=joingroup/rid@=" + ROOM_ID + "/gid@=-9999/"; byte[] joingroupBytes = DataToBytes(joingroup); tcpClient.Send(joingroupBytes); string recvStr = ""; byte[] recvBytes = new byte[1024]; int bytes; while (true) { bytes = tcpClient.Receive(recvBytes, recvBytes.Length, 0);//从服务器端接受返回信息 recvStr = Encoding.UTF8.GetString(recvBytes, 0, bytes); ShowMsg(recvStr); } } static BrrageMsg GetMsgType(string[] msgType) { BrrageMsg brrageMsg = new BrrageMsg(); foreach (string keyValueTemp in msgType) { string[] keyValue = Regex.Split(keyValueTemp, "@=", RegexOptions.IgnoreCase); if (keyValue.Length >= 2) { string key = keyValue[0]; string[] textArr = new string[keyValue.Length - 1]; Array.Copy(keyValue, 1, textArr, 0, keyValue.Length - 1); string value = String.Join("@", textArr); if (key =="nn") { brrageMsg.Name = value; } if ((key == "txt")) { brrageMsg.Txt = value; } } } return brrageMsg; } static void ShowMsg(string msg) { msg = FIX_TAIL + msg; string[] chatmsgArray = Regex.Split(msg, "type@=", RegexOptions.IgnoreCase); FIX_TAIL = chatmsgArray[chatmsgArray.Length - 1]; //截取最后的丢弃数据,放在下个包的开头,防止数据丢失 string[] newChatmsgArrayArr = new string[chatmsgArray.Length - 1]; Array.Copy(chatmsgArray, 0, newChatmsgArrayArr, 0, chatmsgArray.Length - 1); foreach (string t in newChatmsgArrayArr) { string[] msgType = t.Split(‘/‘); if (msgType.Length >= 2) { string type = msgType[0]; if (type == "chatmsg") { BrrageMsg brrageMsg=GetMsgType(msgType); string result = String.Format("[{0}]: {1}", brrageMsg.Name, brrageMsg.Txt); Console.WriteLine(result); } } } } static void KeepAlive(object obj) { Socket tcpClient = (Socket)obj; byte[] aliveMsg = DataToBytes("type@=mrkl/"); while (true) { tcpClient.Send(aliveMsg); Thread.Sleep(40000); } } } }
原文地址:https://www.cnblogs.com/likehc/p/10427130.html
时间: 2024-10-13 23:58:38