123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import asyncio
- import datetime
- import json
- import re
- import websockets
- import logging
- import threading
- logging.basicConfig(level=logging.INFO)
- ip = "192.168.137.1"
- port = 9999
- connected_clients = set()
- def parse_message(message):
- # 分割消息类型和JSON字符串
- msg_type, json_str = message.split(':', 1)
- # 将JSON字符串转换为字典
- data = json.loads(json_str)
- # 返回解析结果
- return msg_type, data
- class User:
- def __init__(self, bet_count, uid, betAmount):
- self.bet_count = bet_count
- self.uid = uid
- self.betAmount = betAmount
- class UserEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, User):
- return {"uid": obj.uid, "bet_count": obj.bet_count, "betAmount": obj.betAmount}
- # 让基类的 default 方法抛出 TypeError
- return json.JSONEncoder.default(self, obj)
- # 用于存储User对象的用户信息和次数,key为用户ID,value为User对象
- user_dict = {}
- async def deal_message(message):
- try:
- msg_type, data = parse_message(message)
- if msg_type == "BetNotify":
- # {"uid":4174549,"detail":{"option":302,"betAmount":1000,"auto":false,"is_shot":false,"win_amt":0,"odds":0,"betGameCoin":0},"curCoin":767943,"selfBet":1000,"totalBet":67000,"curUsdt":0}
- print(f"下注: {data}")
- # 解析日期和小时
- now = datetime.datetime.now()
- date = now.strftime("%Y-%m-%d")
- hour = now.strftime("%H")
- minute = now.strftime("%M")
- # 获取用户ID
- uid = data["uid"]
- bet_amount = data["detail"]["betAmount"]
- # 获取用户信息
- user = user_dict.get(uid)
- if user is None:
- # 如果用户信息不存在,创建一个新的User对象
- user = User(0, "", 0)
- user_dict[uid] = user
- # 更新用户信息
- user.bet_count += 1
- user.uid = uid
- user.betAmount += bet_amount / 100
- print(f"用户{uid}下注次数: {user.bet_count}, 总金额: {user.betAmount}")
- user_dict[uid] = user
- elif msg_type == "GameDataSynNotify":
- print(f"同步游戏数据: {data}")
- elif msg_type == "KickNotify":
- print(f"被踢下线了: {data}")
- await broadcast('''CMD:
- setTimeout(function(){
- let btnDownNode = cc.find("Layer/PushNotice_panel/close_button");
- if (btnDownNode && btnDownNode.getComponent(cc.Button)) {
- btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]);
- } else {
- console.error("节点未找到或节点上没有cc.Button组件");
- }
- },3000);
- setTimeout(function(){
- let btnDownNode = cc.find("Canvas/container/content/smallGame/createScrollView/view/cotent/row1/pokerNode/porkermaster/allHand");
- if (btnDownNode && btnDownNode.getComponent(cc.Button)) {
- btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]);
- } else {
- console.error("节点未找到或节点上没有cc.Button组件");
- }
- },6000);
- ''')
- elif msg_type == "StopBetNotify":
- # 每把结束存储数据,然后清空数据
- with open("user_bet.log", "a") as f:
- time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- # 将user_dict 转成json字符串
- f.write(f"{time}:{json.dumps(user_dict, cls=UserEncoder)}\n")
- user_dict.clear()
- else:
- logging.info(f"Received message: {message}")
- except json.JSONDecodeError as e:
- logging.error(f"Error decoding JSON: {e}")
- except Exception as e:
- logging.info(f"Received message: {message}")
- async def echo(websocket, path):
- connected_clients.add(websocket)
- try:
- async for message in websocket:
- # with open("ws.log", "a") as f:
- # time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- # f.write(f"{time}:{message}\n")
- await deal_message(message)
- await websocket.send("Server says: 你好,服务端收到,连接建立成功")
- except websockets.ConnectionClosed as e:
- logging.warning(f"Connection closed: {e.reason}")
- except Exception as e:
- logging.error(f"Error: {e}", exc_info=True)
- finally:
- connected_clients.remove(websocket)
- async def broadcast(message):
- if connected_clients:
- await asyncio.wait([client.send(message) for client in connected_clients])
- else:
- print("No clients are connected.")
- def start_cli(loop):
- while True:
- # Enter command (1: Send custom message, 2: Send JS script from file): 翻译成中文并且print
- print("输入命令 (1: 发送自定义消息, 2: 发送JS脚本):")
- command = input("请输入:")
- if command == "1":
- message = input("Enter message to send: ")
- # 在当前运行的事件循环中安排broadcast任务
- loop.call_soon_threadsafe(asyncio.create_task, broadcast(message))
- elif command == "2":
- sendJsCommand(loop)
- else:
- print("Unknown command")
- def sendJsCommand(loop):
- with open("script.js", "r", encoding="utf-8") as file:
- script_content = file.read()
- # 去除script中注释的行注释或者块注释
- script_content = re.sub(r"//.*?$", "", script_content, flags=re.MULTILINE) # 去除行注释
- script_content = re.sub(r"/\*.*?\*/", "", script_content, flags=re.DOTALL) # 去除块注释
- # 可选:移除因为删除注释而产生的多余空行
- script_content = re.sub(r"\n\s*\n", "\n", script_content, flags=re.MULTILINE)
- # 在当前运行的事件循环中安排broadcast任务
- loop.call_soon_threadsafe(asyncio.create_task, broadcast(script_content))
- async def main():
- server = await websockets.serve(echo, ip, port)
- try:
- await server.wait_closed()
- except KeyboardInterrupt:
- logging.info("Server stopped by KeyboardInterrupt")
- if __name__ == '__main__':
- now = datetime.datetime.now()
- cache_date = now.strftime("%Y-%m-%d")
- cache_hour = now.strftime("%H")
- loop = asyncio.get_event_loop()
- # 在另一个线程中启动 CLI
- cli_thread = threading.Thread(target=start_cli, args=(loop,), daemon=True)
- cli_thread.start()
- try:
- loop.run_until_complete(main())
- except Exception as e:
- logging.error(f"Server error: {e}", exc_info=True)
- finally:
- loop.close()
|