import asyncio import datetime import json import re import websockets import logging import threading logging.basicConfig(level=logging.INFO) ip = "192.168.137.1" port = 9999 connected_clients = set() def parse_message(message): # 分割消息类型和JSON字符串 msg_type, json_str = message.split(':', 1) # 将JSON字符串转换为字典 data = json.loads(json_str) # 返回解析结果 return msg_type, data class User: def __init__(self, bet_count, uid, betAmount): self.bet_count = bet_count self.uid = uid self.betAmount = betAmount class UserEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, User): return {"uid": obj.uid, "bet_count": obj.bet_count, "betAmount": obj.betAmount} # 让基类的 default 方法抛出 TypeError return json.JSONEncoder.default(self, obj) # 用于存储User对象的用户信息和次数,key为用户ID,value为User对象 user_dict = {} async def deal_message(message): try: msg_type, data = parse_message(message) if msg_type == "BetNotify": # {"uid":4174549,"detail":{"option":302,"betAmount":1000,"auto":false,"is_shot":false,"win_amt":0,"odds":0,"betGameCoin":0},"curCoin":767943,"selfBet":1000,"totalBet":67000,"curUsdt":0} print(f"下注: {data}") # 解析日期和小时 now = datetime.datetime.now() date = now.strftime("%Y-%m-%d") hour = now.strftime("%H") minute = now.strftime("%M") # 获取用户ID uid = data["uid"] bet_amount = data["detail"]["betAmount"] # 获取用户信息 user = user_dict.get(uid) if user is None: # 如果用户信息不存在,创建一个新的User对象 user = User(0, "", 0) user_dict[uid] = user # 更新用户信息 user.bet_count += 1 user.uid = uid user.betAmount += bet_amount / 100 print(f"用户{uid}下注次数: {user.bet_count}, 总金额: {user.betAmount}") user_dict[uid] = user elif msg_type == "GameDataSynNotify": print(f"同步游戏数据: {data}") elif msg_type == "KickNotify": print(f"被踢下线了: {data}") await broadcast('''CMD: setTimeout(function(){ let btnDownNode = cc.find("Layer/PushNotice_panel/close_button"); if (btnDownNode && btnDownNode.getComponent(cc.Button)) { btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]); } else { console.error("节点未找到或节点上没有cc.Button组件"); } },3000); setTimeout(function(){ let btnDownNode = cc.find("Canvas/container/content/smallGame/createScrollView/view/cotent/row1/pokerNode/porkermaster/allHand"); if (btnDownNode && btnDownNode.getComponent(cc.Button)) { btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]); } else { console.error("节点未找到或节点上没有cc.Button组件"); } },6000); ''') elif msg_type == "StopBetNotify": # 每把结束存储数据,然后清空数据 with open("user_bet.log", "a") as f: time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 将user_dict 转成json字符串 f.write(f"{time}:{json.dumps(user_dict, cls=UserEncoder)}\n") user_dict.clear() else: logging.info(f"Received message: {message}") except json.JSONDecodeError as e: logging.error(f"Error decoding JSON: {e}") except Exception as e: logging.info(f"Received message: {message}") async def echo(websocket, path): connected_clients.add(websocket) try: async for message in websocket: # with open("ws.log", "a") as f: # time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # f.write(f"{time}:{message}\n") await deal_message(message) await websocket.send("Server says: 你好,服务端收到,连接建立成功") except websockets.ConnectionClosed as e: logging.warning(f"Connection closed: {e.reason}") except Exception as e: logging.error(f"Error: {e}", exc_info=True) finally: connected_clients.remove(websocket) async def broadcast(message): if connected_clients: await asyncio.wait([client.send(message) for client in connected_clients]) else: print("No clients are connected.") def start_cli(loop): while True: # Enter command (1: Send custom message, 2: Send JS script from file): 翻译成中文并且print print("输入命令 (1: 发送自定义消息, 2: 发送JS脚本):") command = input("请输入:") if command == "1": message = input("Enter message to send: ") # 在当前运行的事件循环中安排broadcast任务 loop.call_soon_threadsafe(asyncio.create_task, broadcast(message)) elif command == "2": sendJsCommand(loop) else: print("Unknown command") def sendJsCommand(loop): with open("script.js", "r", encoding="utf-8") as file: script_content = file.read() # 去除script中注释的行注释或者块注释 script_content = re.sub(r"//.*?$", "", script_content, flags=re.MULTILINE) # 去除行注释 script_content = re.sub(r"/\*.*?\*/", "", script_content, flags=re.DOTALL) # 去除块注释 # 可选:移除因为删除注释而产生的多余空行 script_content = re.sub(r"\n\s*\n", "\n", script_content, flags=re.MULTILINE) # 在当前运行的事件循环中安排broadcast任务 loop.call_soon_threadsafe(asyncio.create_task, broadcast(script_content)) async def main(): server = await websockets.serve(echo, ip, port) try: await server.wait_closed() except KeyboardInterrupt: logging.info("Server stopped by KeyboardInterrupt") if __name__ == '__main__': now = datetime.datetime.now() cache_date = now.strftime("%Y-%m-%d") cache_hour = now.strftime("%H") loop = asyncio.get_event_loop() # 在另一个线程中启动 CLI cli_thread = threading.Thread(target=start_cli, args=(loop,), daemon=True) cli_thread.start() try: loop.run_until_complete(main()) except Exception as e: logging.error(f"Server error: {e}", exc_info=True) finally: loop.close()