群聊
from flask import Flask, request, render_templatefrom geventwebsocket.handler import WebSocketHandlerfrom gevent.pywsgi import WSGIServer from geventwebsocket.websocket import WebSocket app = Flask(__name__) # type:Flask user_socket_list = [] @app.route("/ws")def ws(): user_socket = request.environ.get("wsgi.websocket") # type:WebSocket if user_socket: user_socket_list.append(user_socket) print(len(user_socket_list), user_socket_list) while 1: msg = user_socket.receive() print(msg) for usocket in user_socket_list: if user_socket == usocket: continue try: usocket.send(msg) except: continue @app.route("/")def index(): return render_template("ws.html") if __name__ == ‘__main__‘: # app.run("0.0.0.0",5000,debug=True) http_serv = WSGIServer(("0.0.0.0", 5000), app, handler_class=WebSocketHandler) http_serv.serve_forever()**************************************************************************************************************************************************
<!DOCTYPE html><html lang="zh-CN"><head> <meta http-equiv="content-Type" charset="UTF-8"> <meta http-equiv="x-ua-compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Title</title></head><body><p>发送内容:<input type="text" id="message"><button onclick="send_msg()">发送消息</button></p><div id="msg_list" style="width: 500px;"> </div></body><script type="application/javascript"> var ws = new WebSocket("ws://127.0.0.1:5000/ws"); // ws.onopen = function(){ // alert("欢迎来到S14群喷"); // }; ws.onmessage = function (ws_status) { console.log(ws_status.data); var ptag = document.createElement("p"); ptag.innerText = ws_status.data; document.getElementById("msg_list").appendChild(ptag); }; function send_msg() { var msg = document.getElementById("message").value; var ptag = document.createElement("p"); ptag.style.cssText = "text-align: right;"; ptag.innerText = msg; document.getElementById("msg_list").appendChild(ptag); ws.send(msg); }</script></html>
单聊
from flask import Flask, request, render_templatefrom geventwebsocket.handler import WebSocketHandlerfrom gevent.pywsgi import WSGIServerimport json from geventwebsocket.websocket import WebSocket app = Flask(__name__) # type:Flask user_socket_dict = {} @app.route("/ws/<user>")def ws(user): user_socket = request.environ.get("wsgi.websocket") # type:WebSocket if user_socket: user_socket_dict[user] = user_socket print(len(user_socket_dict), user_socket_dict) while 1: msg = user_socket.receive() print(msg) # b"{from_user:jinwangba ,to_user:yinwangba,msg:"doushidawangba"}" msg_dict = json.loads(msg) to_usocket = user_socket_dict.get(msg_dict.get("to_user")) to_usocket.send(json.dumps({"from_user": user, "to_user": msg_dict.get("to_user"), "msg": msg_dict.get("msg")})) @app.route("/")def index(): return render_template("wsd.html") if __name__ == ‘__main__‘: # app.run("0.0.0.0",5000,debug=True) http_serv = WSGIServer(("0.0.0.0", 5000), app, handler_class=WebSocketHandler) http_serv.serve_forever()
*********************************************************************************************************************************************************************************************
<!DOCTYPE html><html lang="zh-CN"><head> <meta http-equiv="content-Type" charset="UTF-8"> <meta http-equiv="x-ua-compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Title</title></head><body><p>您的昵称:<input type="text" id="nick"><button onclick="openws()">进入聊天室</button></p><p>给<input type="text" id="to_user">发送:<input type="text" id="message"><button onclick="send_msg()">发送消息</button></p><div id="msg_list" style="width: 500px;"> </div></body><script type="application/javascript"> var ws = null; // ws.onopen = function(){ // alert("欢迎来到S14群喷"); // }; function openws(){ var nick = document.getElementById("nick").value; ws = new WebSocket("ws://127.0.0.1:5000/ws/"+nick); ws.onmessage = function (ws_status) { console.log(ws_status.data); var msg_obj = JSON.parse(ws_status.data); var ptag = document.createElement("p"); ptag.innerText = msg_obj.from_user + " : " +msg_obj.msg; document.getElementById("msg_list").appendChild(ptag); }; } function send_msg() { var msg = document.getElementById("message").value; var from_user = document.getElementById("nick").value; var to_user = document.getElementById("to_user").value; var ptag = document.createElement("p"); ptag.style.cssText = "text-align: right;"; ptag.innerText = msg + " : "+ from_user; document.getElementById("msg_list").appendChild(ptag); var msg_obj = { msg:msg, from_user:from_user, to_user:to_user }; ws.send(JSON.stringify(msg_obj)); }; </script></html>
加密
加密:
1 import struct 2 msg_bytes = "hello".encode("utf8") 3 token = b"\x81" 4 length = len(msg_bytes) 5 6 if length < 126: 7 token += struct.pack("B", length) 8 elif length == 126: 9 token += struct.pack("!BH", 126, length) 10 else: 11 token += struct.pack("!BQ", 127, length) 12 13 msg = token + msg_bytes 14 15 print(msg)
解密
解密:
1 # b‘\x81\x83\xceH\xb6\x85\xffz\x85‘ 2 3 hashstr = b‘\x81\x83\xceH\xb6\x85\xffz\x85‘ 4 # b‘\x81 \x83 \xceH\xb6\x85\xffz\x85‘ 5 6 # 将第二个字节也就是 \x83 第9-16位 进行与127进行位运算 7 payload = hashstr[1] & 127 8 print(payload) 9 if payload == 127: 10 extend_payload_len = hashstr[2:10] 11 mask = hashstr[10:14] 12 decoded = hashstr[14:] 13 # 当位运算结果等于127时,则第3-10个字节为数据长度 14 # 第11-14字节为mask 解密所需字符串 15 # 则数据为第15字节至结尾 16 17 if payload == 126: 18 extend_payload_len = hashstr[2:4] 19 mask = hashstr[4:8] 20 decoded = hashstr[8:] 21 # 当位运算结果等于126时,则第3-4个字节为数据长度 22 # 第5-8字节为mask 解密所需字符串 23 # 则数据为第9字节至结尾 24 25 26 if payload <= 125: 27 extend_payload_len = None 28 mask = hashstr[2:6] 29 decoded = hashstr[6:] 30 31 # 当位运算结果小于等于125时,则这个数字就是数据的长度 32 # 第3-6字节为mask 解密所需字符串 33 # 则数据为第7字节至结尾 34 35 str_byte = bytearray() 36 37 for i in range(len(decoded)): 38 byte = decoded[i] ^ mask[i % 4] 39 str_byte.append(byte) 40 41 print(str_byte.decode("utf8"))
1 # b‘\x81\x83\xceH\xb6\x85\xffz\x85‘ 2 3 hashstr = b‘\x81\x83\xceH\xb6\x85\xffz\x85‘ 4 # b‘\x81 \x83 \xceH\xb6\x85\xffz\x85‘ 5 6 # 将第二个字节也就是 \x83 第9-16位 进行与127进行位运算 7 payload = hashstr[1] & 127 8 print(payload) 9 if payload == 127: 10 extend_payload_len = hashstr[2:10] 11 mask = hashstr[10:14] 12 decoded = hashstr[14:] 13 # 当位运算结果等于127时,则第3-10个字节为数据长度 14 # 第11-14字节为mask 解密所需字符串 15 # 则数据为第15字节至结尾 16 17 if payload == 126: 18 extend_payload_len = hashstr[2:4] 19 mask = hashstr[4:8] 20 decoded = hashstr[8:] 21 # 当位运算结果等于126时,则第3-4个字节为数据长度 22 # 第5-8字节为mask 解密所需字符串 23 # 则数据为第9字节至结尾 24 25 26 if payload <= 125: 27 extend_payload_len = None 28 mask = hashstr[2:6] 29 decoded = hashstr[6:] 30 31 # 当位运算结果小于等于125时,则这个数字就是数据的长度 32 # 第3-6字节为mask 解密所需字符串 33 # 则数据为第7字节至结尾 34 35 str_byte = bytearray() 36 37 for i in range(len(decoded)): 38 byte = decoded[i] ^ mask[i % 4] 39 str_byte.append(byte) 40 41 print(str_byte.decode("utf8"))
原文地址:https://www.cnblogs.com/xdlzs/p/10170954.html
时间: 2024-11-12 19:45:47