import datetime import json import logging from itertools import product # 所有的下注选项 OPTIONS = [101, 102, 103, 301, 302, 303, 304, 305] # 下注选项 class BetOption: def __init__(self, option, odds, limit, total_bet=0): self.option = option if odds == 0: self.odds = 1 else: self.odds = odds self.limitRed = limit self.total_bet = total_bet def __str__(self): return (f"BetOption(option={self.option}, odds={self.odds}, " f"limitRed={self.limitRed}, totalBet={self.total_bet})") # 用户信息 class User: # 对应option的下注金额 def __init__(self, bet_count, uid, amount): self.bet_count = bet_count self.uid = uid self.betAmount = amount self.finance = 0 self.betOptionAmounts = {option: 0 for option in OPTIONS} class UserEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, User): return {"uid": obj.uid, "bet_count": obj.bet_count, "betAmount": obj.betAmount, "finance": obj.finance} # 让基类的 default 方法抛出 TypeError return json.JSONEncoder.default(self, obj) def init_round_option_info(): global round_option_info round_option_info = {option: BetOption(option, 0, 0) for option in OPTIONS} # 用于存储User对象的用户信息和次数,key为用户ID,value为User对象 round_user_betinfo = {} round_option_info = {} # 本轮选项赔率和下注金额 init_round_option_info() # 庄家的钱 banker_finance = 0 # 虚拟一个用户,用来记录同步下来的下注数据,不然结算会异常,但是这样统计个人数据的时候,就会有点出入, # 没办法,因为会被定时踢出 DATA_SYN_NOTIFY_UID = -9999 async def deal_message(message, broadcast_func): try: msg_type, data = parse_message(message) if msg_type == "BetNotify": await betNotify(data) elif msg_type == "GameDataSynNotify": await gameDataSynNotify(data) elif msg_type == "ShowOddsNotify": await ShowOddsNotify(data) elif msg_type == "KickNotify": await kickNotify(data, broadcast_func) elif msg_type == "GameRoundEndNotify": await gameRoundEndNotify(data) elif msg_type == "MergeAutoBetNotify": await MergeAutoBetNotify(data) else: logging.info(f"Received message: {message}") except Exception as e: logging.info(f"Received message: {message}") async def gameRoundEndNotify(data): global banker_finance time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"{time}:收到结束下注GameRoundEndNotify") # 开始结算 # 本轮收入 current_round_income = 0 # 本轮总下注金额 current_round_bet_amount = 0 for option in round_option_info.values(): current_round_bet_amount += option.total_bet current_round_income = current_round_bet_amount # 开奖结果 option_results = data["optionResult"] win_option = [x["option"] for x in option_results if x["result"] == 1] print(f"本轮中奖选项: {win_option}") # 庄家结算 current_round_income = await bankerSettlement(current_round_income, option_results) # 玩家结算 await playerSettlement(option_results) await win_option_combo_analysis(current_round_bet_amount, win_option) banker_finance += current_round_income print( f"庄家金币: {round(banker_finance / 100, 2)}¥ 本轮总下注{round(current_round_bet_amount / 100, 2)}¥ 收入: {round(current_round_income / 100, 2)}¥") # 格式化一下,避免数字位数导致各种缩进 with open("庄家盈亏记录.txt", "a", encoding="utf-8") as f: finalxx = f"{time}:庄家金币: {banker_finance / 100}¥ 本轮总下注{current_round_bet_amount / 100}¥ 收入: {current_round_income / 100}¥" print(finalxx) f.write(finalxx + "\n") # 每把结束存储数据,然后清空数据 with open("用户记录.txt", "a", encoding="utf-8") as f: # 将user_dict 转成json字符串 f.write(f"{time}:{json.dumps(round_user_betinfo, cls=UserEncoder)}\n") round_user_betinfo.clear() init_round_option_info() # 计算所有组合的可能性及其赔付金额 async def win_option_combo_analysis(current_round_bet_amount, win_option): # 统计所有结果的可能性 try: win_npc = {key: value for key, value in round_option_info.items() if key in [101, 102, 103]} win_card = {key: value for key, value in round_option_info.items() if key in [301, 302, 303, 304, 305]} # 计算所有组合的可能性及其赔付金额 combinations = list(product(win_npc.keys(), win_card.keys())) # 计算每个组合的赔付金额 combination_payouts = [] for npc_key, card_key in combinations: npc_info = win_npc[npc_key] card_info = win_card[card_key] payout = (npc_info.odds * npc_info.total_bet) + (card_info.odds * card_info.total_bet) combination_payouts.append({ "combo": f"{npc_key}x{card_key}", "payout": payout, "open": npc_key in win_option and card_key in win_option }) # 按赔付金额排序 sorted_combinations = sorted(combination_payouts, key=lambda x: x["payout"], reverse=True) # 输出排序后的组合和对应的赔付金额 with open("庄家盈亏记录.txt", "a", encoding="utf-8") as f: label = f"{'标记':<4} {'组合':<10} {'赔付':<15} {'收入':<10}\n" print(label) f.write(label) for item in sorted_combinations: combo_key = item["combo"] payout = item["payout"] income = round((current_round_bet_amount - payout) / 100, 2) badge = "√" if item["open"] else "×" combos = f"{badge:<4} {combo_key:<10} {round(payout / 100, 2):<15}¥ {income:<10}¥\n" print(combos) f.write(combos) except Exception as e: print(e) # 庄家结算统计方法 async def bankerSettlement(current_round_income, option_results): for open_result in option_results: # 开奖结果 option = open_result["option"] result = open_result["result"] # 这个是下注的选项信息 option_info_option_ = round_option_info[option] odds_rate = option_info_option_.odds if result == 1: # 证明这个选项中奖了 # 庄家结算 final_out = option_info_option_.total_bet * odds_rate current_round_income -= final_out print( f"选项{option}中奖了, 赔率{odds_rate}, 该选项总下注{round(option_info_option_.total_bet / 100, 2)}¥, 赔付了{round(final_out / 100, 2)}¥") return current_round_income # 玩家结算统计方法 async def playerSettlement(option_results): # 玩家结算 try: for user in round_user_betinfo.values(): for option in user.betOptionAmounts: open_result = next(filter(lambda x: x["option"] == option, option_results)) odds_rate = round_option_info[option].odds if open_result is not None and open_result["result"] == 1: user.finance += user.betOptionAmounts[option] * odds_rate else: user.finance -= user.betOptionAmounts[option] except Exception as e: pass async def kickNotify(data, broadcast_func): print(f"被踢下线了: {data}") await broadcast_func('''CMD: setTimeout(function(){ let btnDownNode = cc.find("Layer/PushNotice_panel/close_button"); if (btnDownNode && btnDownNode.getComponent(cc.Button)) { btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]); } else { console.error("节点未找到或节点上没有cc.Button组件"); } },3000); setTimeout(function(){ let btnDownNode = cc.find("Canvas/container/content/smallGame/createScrollView/view/cotent/row1/pokerNode/porkermaster/allHand"); if (btnDownNode && btnDownNode.getComponent(cc.Button)) { btnDownNode.getComponent(cc.Button).clickEvents[0].emit([btnDownNode]); } else { console.error("没有找到扑克大师游戏节点"); } // 点击扑克大师游戏线路1 let pokerMasterGameLine1 = cc.find("Canvas/container/content/smallGame/New Node/listPokerMasterContent/main/view/content/item/main"); if (pokerMasterGameLine1 && pokerMasterGameLine1.getComponent(cc.Button)) { pokerMasterGameLine1.getComponent(cc.Button).clickEvents[0].emit([pokerMasterGameLine1]); } else { console.error("未找到扑克大师游戏游戏节点1"); } },5000); ''') def ShowOddsNotify(data): option_odds = data["option_odds"] for optionInfo in option_odds: option_ = optionInfo["option"] odds_ = optionInfo["odds"] / 100 limit_red_ = optionInfo["limitRed"] # 打印信息 print(f"选项: {option_}, 赔率: {odds_}, 限红: {limit_red_}, 总下注: {0}") round_option_info[option_] = BetOption(option_, odds_, limit_red_, 0) async def gameDataSynNotify(data): # print(f"同步游戏数据: {data}") sync_user = User(1, DATA_SYN_NOTIFY_UID, 0) for optionInfo in data["optionInfo"]: option_ = optionInfo["option"] odds_ = optionInfo["odds"] / 100 limit_red_ = optionInfo["limitRed"] total_bet_ = optionInfo["totalBet"] sync_user.betAmount += total_bet_ # 打印信息 print(f"同步下注选项: {option_}, 赔率: {odds_}, 限红: {limit_red_}, 总下注: {total_bet_}") round_option_info[option_] = BetOption(option_, odds_, limit_red_, total_bet_) # 虚拟用户,用来记录同步下来的下注数据,后续用于赔付 sync_user.betOptionAmounts[option_] = total_bet_ round_user_betinfo[DATA_SYN_NOTIFY_UID] = sync_user # 同步的数据应该包含了所有的下注选项,所以这里直接清空,但是这样统计个人数据的时候,就会有点出入 round_user_betinfo.clear() print(f"同步后下注信息:", list(map(lambda x: str(x), round_option_info.values()))) async def MergeAutoBetNotify(data): print("收到合并通知") bet_notifys = data["notify"] for bet_notify in bet_notifys: await betNotify(bet_notify) async def betNotify(data): # {"uid":4174549,"detail":{"option":302,"betAmount":1000,"auto":false,"is_shot":false,"win_amt":0,"odds":0,"betGameCoin":0},"curCoin":767943,"selfBet":1000,"totalBet":67000,"curUsdt":0} # print(f"下注: {data}") # 获取用户ID uid = data["uid"] option = data["detail"]["option"] bet_amount = data["detail"]["betAmount"] # 获取用户信息 user = round_user_betinfo.get(uid) if user is None: # 如果用户信息不存在,创建一个新的User对象 user = User(0, "", 0) round_user_betinfo[uid] = user # 更新用户信息 user.bet_count += 1 user.uid = uid user.betAmount += bet_amount # 更新本轮下注选项的金额 user.betOptionAmounts[option] += bet_amount # 更新本轮全部选项的金额 round_option_info[option].total_bet += bet_amount print(f"用户{uid}下注次数: {user.bet_count}, 总金额: {user.betAmount}") round_user_betinfo[uid] = user def parse_message(message): # 分割消息类型和JSON字符串 msg_type, json_str = message.split(':', 1) # 将JSON字符串转换为字典 data = json.loads(json_str) # 返回解析结果 return msg_type, data