Spaces:
Running
Running
| """ | |
| Flask Backend for Love Live Card Game Web UI | |
| """ | |
| import json | |
| import os | |
| import random | |
| import sys | |
| import threading | |
| import uuid | |
| from datetime import datetime | |
| from typing import Any | |
| import numpy as np | |
| from flask import Flask, jsonify, request, send_from_directory | |
| from flask.json.provider import DefaultJSONProvider | |
| # Ensure project root is in sys.path for absolute imports | |
| if getattr(sys, "frozen", False): | |
| PROJECT_ROOT = sys._MEIPASS # type: ignore | |
| CURRENT_DIR = os.path.join(PROJECT_ROOT, "backend") | |
| else: | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..")) | |
| if PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, PROJECT_ROOT) | |
| # Rust Engine | |
| import engine_rust | |
| try: | |
| from ai.headless_runner import RandomAgent, create_easy_cards | |
| from ai.headless_runner import SmartHeuristicAgent as SmartAgent | |
| AI_AVAILABLE = True | |
| except ImportError: | |
| print("Warning: AI modules not found. AI features will be disabled.") | |
| AI_AVAILABLE = False | |
| class RandomAgent: pass | |
| class SmartAgent: pass | |
| def create_easy_cards(): return None, None | |
| from engine.game.data_loader import CardDataLoader | |
| from engine.game.desc_utils import get_action_desc | |
| from engine.game.enums import Phase | |
| from engine.game.game_state import GameState | |
| from engine.game.replay_manager import inflate_history, optimize_history | |
| from engine.game.serializer import serialize_state | |
| from engine.game.state_utils import create_uid | |
| try: | |
| from rust_serializer import RustGameStateSerializer | |
| except ImportError: | |
| from backend.rust_serializer import RustGameStateSerializer | |
| INSTANCE_SHIFT = 20 | |
| BASE_ID_MASK = 0xFFFFF | |
| # --- MODULE DIRECTORIES --- | |
| ENGINE_DIR = os.path.join(PROJECT_ROOT, "engine") | |
| AI_DIR = os.path.join(PROJECT_ROOT, "ai") | |
| TOOLS_DIR = os.path.join(PROJECT_ROOT, "tools") | |
| DATA_DIR = os.path.join(PROJECT_ROOT, "data") | |
| # Tools imports (optional) | |
| try: | |
| from tools.deck_extractor import extract_deck_data | |
| except ImportError: | |
| print("Warning: Could not import deck_extractor from tools.") | |
| def extract_deck_data(content, db): | |
| return [], [], {}, ["Importer not found"] | |
| # Static folder is now in frontend/web_ui | |
| FRONTEND_DIR = os.path.join(PROJECT_ROOT, "frontend") | |
| WEB_UI_DIR = os.path.join(FRONTEND_DIR, "web_ui") | |
| IMG_DIR = os.path.join(FRONTEND_DIR, "img") # Images seem to be in frontend/img | |
| # Note: frontend/web_ui has its own js/css folders which index.html likely uses | |
| app = Flask(__name__, static_folder=WEB_UI_DIR) | |
| class NumpyJSONProvider(DefaultJSONProvider): | |
| def default(self, obj): | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return super().default(obj) | |
| app.json = NumpyJSONProvider(app) | |
| def serve_img(filename): | |
| # Sanitize and normalize the filename | |
| filename = filename.replace("\\", "/").lstrip("/") | |
| # Check if this is a card image request | |
| if filename.startswith("cards/") or filename.startswith("cards_webp/"): | |
| # Remove old nested 'cards/' prefix if it's there | |
| pure_filename = os.path.basename(filename) | |
| webp_path = os.path.join(IMG_DIR, "cards_webp", pure_filename) | |
| # Priority 1: Flat WebP folder | |
| if os.path.exists(webp_path) and os.path.isfile(webp_path): | |
| return send_from_directory(os.path.join(IMG_DIR, "cards_webp"), pure_filename) | |
| # Priority 2: Try falling back to original nested PNGs for backward compatibility/backup | |
| # (This is mostly for non-compiled access or manual links) | |
| pass | |
| # Define possible search directories relative to PROJECT_ROOT | |
| search_dirs = [ | |
| os.path.join(IMG_DIR, "cards_webp"), # Flattened WebP first | |
| IMG_DIR, # frontend/img | |
| os.path.join(IMG_DIR, "texticon"), # frontend/img/texticon | |
| os.path.join(WEB_UI_DIR, "img"), # frontend/web_ui/img | |
| FRONTEND_DIR # Allow direct frontend access if needed | |
| ] | |
| for base_dir in search_dirs: | |
| full_path = os.path.join(base_dir, filename) | |
| if os.path.exists(full_path) and os.path.isfile(full_path): | |
| return send_from_directory(base_dir, filename) | |
| # Fallback for .webp requesting .png or vice-versa | |
| if filename.endswith(".webp"): | |
| png_fallback = filename[:-5] + ".png" | |
| full_png_path = os.path.join(base_dir, png_fallback) | |
| if os.path.exists(full_png_path) and os.path.isfile(full_png_path): | |
| return send_from_directory(base_dir, png_fallback) | |
| # Extra fallback for common icons if they are misplaced | |
| if filename == "icon_blade.png" or "icon_blade" in filename: | |
| # Try to find it anywhere in frontend/img | |
| for root, dirs, files in os.walk(IMG_DIR): | |
| if "icon_blade.png" in files: | |
| return send_from_directory(root, "icon_blade.png") | |
| print(f"DEBUG_IMG_404: Could not find {filename} in {search_dirs}") | |
| return "Image not found", 404 | |
| def serve_icon_root(): | |
| return serve_img("icon_blade.png") | |
| # ai_agent = SmartHeuristicAgent() | |
| ai_agent = SmartAgent() # Use original heuristic AI | |
| # Global game state | |
| # Room Registry | |
| ROOMS: dict[str, dict[str, Any]] = {} | |
| game_lock = threading.Lock() | |
| # Rust Card DB (Global Singleton for performance) | |
| RUST_DB = None | |
| try: | |
| compiled_data_path = os.path.join(DATA_DIR, "cards_compiled.json") | |
| with open(compiled_data_path, "r", encoding="utf-8") as f: | |
| RUST_DB = engine_rust.PyCardDatabase(f.read()) | |
| except Exception as e: | |
| print(f"Warning: Failed to load RUST_DB from {compiled_data_path}: {e}") | |
| # Python DBs (for metadata/serialization) | |
| member_db: dict[int, Any] = {} | |
| live_db: dict[int, Any] = {} | |
| energy_db: dict[int, Any] = {} | |
| rust_serializer = None # Initialized after data load | |
| game_history: list[dict] = [] # Global replay history (might need per-room later) | |
| # Legacy custom deck globals (used by init_game) | |
| custom_deck_p0: list[str] | None = None | |
| custom_deck_p1: list[str] | None = None | |
| custom_energy_deck_p0: list[str] | None = None | |
| custom_energy_deck_p1: list[str] | None = None | |
| def load_game_data(): | |
| """Load card data into global databases.""" | |
| global member_db, live_db, energy_db, rust_serializer | |
| try: | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| print(f"Loading card data from: {cards_path}") | |
| loader = CardDataLoader(cards_path) | |
| m, l, e = loader.load() | |
| member_db.update(m) | |
| live_db.update(l) | |
| energy_db.update(e) | |
| # Initialize rust_serializer | |
| rust_serializer = RustGameStateSerializer(member_db, live_db, energy_db) | |
| # Build mapping | |
| build_card_no_mapping() | |
| print(f"Data loaded: {len(member_db)} Members, {len(live_db)} Lives, {len(energy_db)} Energy") | |
| print(f"DEBUG PATHS: PROJECT_ROOT={PROJECT_ROOT}") | |
| print(f"DEBUG PATHS: FRONTEND_DIR={FRONTEND_DIR}") | |
| print(f"DEBUG PATHS: WEB_UI_DIR={WEB_UI_DIR}") | |
| print(f"DEBUG PATHS: IMG_DIR={IMG_DIR}") | |
| except Exception as ex: | |
| print(f"CRITICAL ERROR loading card data: {ex}") | |
| import sys | |
| sys.exit(1) | |
| # Load data immediately on import | |
| def get_room_id() -> str: | |
| """Extract room_id from request header or query param.""" | |
| # Priority: Header > Query Param > Default "SINGLE_PLAYER" | |
| rid = request.headers.get("X-Room-Id") or request.args.get("room_id") | |
| if not rid: | |
| # Debug why no ID found | |
| # print(f"DEBUG: No X-Room-Id or room_id param. Headers: {request.headers}", file=sys.stderr) | |
| rid = "SINGLE_PLAYER" | |
| return rid | |
| def get_player_idx(): | |
| """Extract player perspective from X-Player-Idx header or viewer query param.""" | |
| # Try query param 'viewer' first (commonly used by frontend) | |
| viewer = request.args.get("viewer") | |
| if viewer is not None: | |
| try: | |
| return int(viewer) | |
| except (ValueError, TypeError): | |
| pass | |
| # Fallback to header | |
| try: | |
| return int(request.headers.get("X-Player-Idx", 0)) | |
| except (ValueError, TypeError): | |
| return 0 | |
| def get_room(room_id: str) -> dict[str, Any] | None: | |
| """Get room data safely.""" | |
| with game_lock: | |
| room = ROOMS.get(room_id) | |
| if room: | |
| room["last_active"] = datetime.now() | |
| return room | |
| # Reverse mapping: card_no string -> internal integer ID | |
| card_no_to_id: dict[str, int] = {} | |
| def build_card_no_mapping(): | |
| """Build reverse lookup from card_no string to internal ID using compiled data. | |
| Ensures consistency with the Rust engine's internal ID assignments. | |
| """ | |
| global card_no_to_id | |
| card_no_to_id = {} | |
| try: | |
| compiled_path = os.path.join(DATA_DIR, "cards_compiled.json") | |
| if not os.path.exists(compiled_path): | |
| print(f"Warning: {compiled_path} not found. Mapping will be empty.") | |
| return | |
| with open(compiled_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Build mapping from dbs | |
| count = 0 | |
| for db_name in ["member_db", "live_db", "energy_db"]: | |
| db = data.get(db_name, {}) | |
| for internal_id, card_data in db.items(): | |
| card_no = card_data.get("card_no") | |
| if card_no: | |
| # Convert string key to integer ID | |
| card_no_to_id[card_no] = int(internal_id) | |
| count += 1 | |
| print(f"Built card_no_to_id mapping from compiled data: {count} entries") | |
| except Exception as e: | |
| print(f"Error building mapping from compiled data: {e}") | |
| # Load data immediately on import | |
| load_game_data() | |
| # Initialize mapping on startup | |
| build_card_no_mapping() | |
| def convert_deck_strings_to_ids(deck_strings): | |
| """Convert list of card_no strings to internal IDs (Unique Instance IDs).""" | |
| ids = [] | |
| counts = {} | |
| for card_no in deck_strings: | |
| if card_no in card_no_to_id: | |
| base_id = card_no_to_id[card_no] | |
| count = counts.get(base_id, 0) | |
| uid = create_uid(base_id, count) | |
| counts[base_id] = count + 1 | |
| ids.append(uid) | |
| else: | |
| print(f"Warning: Unknown card_no '{card_no}', skipping.") | |
| return ids | |
| def save_replay(gs: GameState | None = None): | |
| """Save the provided game state's history to a file.""" | |
| if gs is None or not gs.rule_log: | |
| return | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| os.makedirs("replays", exist_ok=True) | |
| filename = f"replays/replay_{timestamp}.json" | |
| filename_opt = f"replays/replay_{timestamp}_opt.json" | |
| # Use historical states from rule_log or history if we maintain one | |
| # For now, we assume GS has what we need or we pass history | |
| history = [] # In this engine, standard replays are often built from logs or incremental states | |
| # 1. Save Standard Replay (Compatible) | |
| data = { | |
| "game_id": 0, | |
| "timestamp": timestamp, | |
| "winner": gs.winner if gs else -1, | |
| "states": history, | |
| } | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False) | |
| print(f"Replay saved to {filename}") | |
| # 2. Save Optimized Replay (Dict Encoded) | |
| try: | |
| print("Optimizing replay...") | |
| # Gather Level 3 Context | |
| deck_info = None | |
| if gs: | |
| deck_info = { | |
| "p0_deck": list(getattr(gs.players[0], "initial_deck_indices", [])), | |
| "p1_deck": list(getattr(gs.players[1], "initial_deck_indices", [])), | |
| } | |
| opt_data = optimize_history( | |
| history, | |
| member_db, | |
| live_db, | |
| energy_db, | |
| exclude_db_cards=True, | |
| # seed=current_seed, | |
| # action_log=action_log, | |
| deck_info=deck_info, | |
| ) | |
| final_opt = { | |
| "game_id": 0, | |
| "timestamp": timestamp, | |
| "winner": gs.winner if gs else -1, | |
| } | |
| # Merge optimization data | |
| if "level" in opt_data and opt_data["level"] == 3: | |
| final_opt.update(opt_data) # seed, decks, action_log | |
| print("Level 3 Optimization Active (Action Log)") | |
| else: | |
| final_opt["states"] = opt_data["states"] | |
| with open(filename_opt, "w", encoding="utf-8") as f: | |
| json.dump(final_opt, f, ensure_ascii=False) | |
| # Calculate savings | |
| size_std = os.path.getsize(filename) | |
| size_opt = os.path.getsize(filename_opt) | |
| savings = (1 - size_opt / size_std) * 100 | |
| print(f"Optimized replay saved to {filename_opt}") | |
| print(f"Compression: {size_std / 1024:.1f}KB -> {size_opt / 1024:.1f}KB ({savings:.1f}% savings)") | |
| except Exception as e: | |
| print(f"Failed to save optimized replay: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| except Exception as e: | |
| print(f"Failed to save replay: {e}") | |
| game_history = [] # For replay recording | |
| action_log = [] # For action-based replay | |
| current_seed = 0 # For deterministic replay | |
| def init_game(deck_type="normal"): | |
| global game_state, member_db, live_db, energy_db, game_history, current_seed, action_log | |
| # Ensure true randomness for each game | |
| import time | |
| real_seed = int(time.time() * 1000) % (2**31) | |
| current_seed = real_seed | |
| random.seed(real_seed) | |
| # Store action history separately for Level 3 Replay | |
| global action_log | |
| action_log = [] | |
| # DATA PATH: data/cards.json | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| loader = CardDataLoader(cards_path) | |
| member_db, live_db, energy_db = loader.load() | |
| # CRITICAL: Populate GameState static DBs so validations work | |
| # Use initialize_class_db to ensure proper wrapping with MaskedDB | |
| GameState.initialize_class_db(member_db, live_db) | |
| GameState.energy_db = energy_db | |
| # Initialize JIT arrays for performance | |
| GameState._init_jit_arrays() | |
| # Build reverse mapping for custom deck support | |
| build_card_no_mapping() | |
| # Pre-calculate Start Deck card IDs | |
| # Load raw JSON to check product field for filtering | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| with open(cards_path, "r", encoding="utf-8") as f: | |
| json.load(f) | |
| for _cid, _m in member_db.items(): | |
| # Find raw key by matching name/cost/type? Or better, DataLoader should store product. | |
| # Since DataLoader doesn't verify product yet, we'll try to guess or just use ALL valid cards | |
| # that are from Start Deck (usually ID < 100 for this mock loader or by string ID). | |
| # Actually, let's just use ALL loaded members/lives for 'normal' and specific ones for 'starter'. | |
| # For 'start_deck', we can filter by card string ID prefix 'PL!-sd1' or 'LL-E'. | |
| # But 'member_db' keys are integers 0..N. We need a way to link back. | |
| # The loader assigns IDs sequentially. | |
| # Let's just build a random valid deck from ALL cards for now, | |
| # unless 'easy' mode. | |
| pass | |
| # If deck_type is 'easy', we use the simple mock cards for logic testing. | |
| # If deck_type is 'normal' or 'starter', we use REAL cards. | |
| if deck_type == "easy": | |
| easy_m, easy_l = create_easy_cards() | |
| member_db[easy_m.card_id] = easy_m | |
| live_db[easy_l.card_id] = easy_l | |
| game_state = GameState() | |
| # Setup players | |
| for pidx, p in enumerate(game_state.players): | |
| # Check for custom deck first | |
| custom_deck = custom_deck_p0 if pidx == 0 else custom_deck_p1 | |
| if custom_deck: | |
| # Use custom deck | |
| p.main_deck = convert_deck_strings_to_ids(custom_deck) | |
| random.shuffle(p.main_deck) # Shuffle custom deck for variety | |
| print(f"Player {pidx}: Using custom deck ({len(p.main_deck)} cards, shuffled)") | |
| elif deck_type == "easy": | |
| # Use Easy Cards (888/999) but mapped to real images | |
| p.main_deck = [888] * 48 + [999] * 12 | |
| else: | |
| # NORMAL / STARTER MODE: Build a valid deck | |
| # Rule: Max 4 copies of same card number. | |
| # Total: 48 Members + 12 Lives (Total 60 in main deck per game_state spec) | |
| p.main_deck = [] | |
| # 1. Select Members (48) | |
| available_members = list(member_db.keys()) | |
| if available_members: | |
| # Shuffle availability to vary decks | |
| random.shuffle(available_members) | |
| member_bucket = [] | |
| for mid in available_members: | |
| # Add 4 copies of each until we have enough | |
| # Use create_uid for unique instance IDs | |
| for i in range(4): | |
| uid = create_uid(mid, i) | |
| member_bucket.append(uid) | |
| if len(member_bucket) >= 150: # Optimization: Don't build massive list | |
| break | |
| # Pick 48 from the bucket | |
| if len(member_bucket) < 48: | |
| # Fallback if DB too small | |
| while len(member_bucket) < 48: | |
| member_bucket.extend(available_members) | |
| # Ensure we don't accidentally pick >4 if we just slice | |
| # Actually, simply taking the first 48 from our constructed bucket (which has 4 of each distinct card) | |
| # guarantees validity if we shuffle the CARDS/TYPES, not the final list. | |
| # Steps: | |
| # 1. Shuffle types. | |
| # 2. Add 4 of Type A, 4 of Type B... | |
| # 3. Take first 48 cards. | |
| p.main_deck.extend(member_bucket[:48]) | |
| # 2. Select Lives (12) | |
| available_lives = list(live_db.keys()) | |
| if available_lives: | |
| random.shuffle(available_lives) | |
| live_bucket = [] | |
| for lid in available_lives: | |
| # live_bucket.extend([lid] * 4) | |
| for i in range(4): | |
| uid = create_uid(lid, i) | |
| live_bucket.append(uid) | |
| if len(live_bucket) >= 50: | |
| break | |
| if len(live_bucket) < 12: | |
| while len(live_bucket) < 12: | |
| live_bucket.extend(available_lives) | |
| p.main_deck.extend(live_bucket[:12]) | |
| random.shuffle(p.main_deck) | |
| # Energy Deck (12 cards) | |
| # Use actual Energy Card ID if available (2000+) | |
| if energy_db: | |
| eid = list(energy_db.keys())[0] # Take first energy card type found | |
| p.energy_deck = [eid] * 12 | |
| else: | |
| p.energy_deck = [40000] * 12 # Fallback | |
| # Custom Energy Deck Override | |
| custom_energy = custom_energy_deck_p0 if pidx == 0 else custom_energy_deck_p1 | |
| if custom_energy: | |
| p.energy_deck = convert_deck_strings_to_ids(custom_energy) | |
| print(f"Player {pidx}: Using custom energy deck ({len(p.energy_deck)} cards)") | |
| # Explicit shuffle before drawing | |
| random.shuffle(p.main_deck) | |
| if game_state.players.index(p) == 0: | |
| print(f"DEBUG: P0 Deck Shuffled. Top 5: {p.main_deck[-5:]}") | |
| # Initial draw (6 cards - standard Mulligan start) | |
| for _ in range(6): | |
| if p.main_deck: | |
| p.hand.append(p.main_deck.pop()) | |
| p.hand_added_turn.append(game_state.turn_number) | |
| # Initial energy: 3 cards (Rule 6.2.1.7) | |
| for _ in range(3): | |
| if p.energy_deck: | |
| p.energy_zone.append(p.energy_deck.pop(0)) | |
| # Randomly determine first player | |
| game_state.first_player = random.randint(0, 1) | |
| # For Mulligan Phase (P1/Index 0), Current Player MUST be 0 | |
| # The 'first_player' variable determines who acts first in ACTIVE phase (Round 1) | |
| game_state.current_player = 0 | |
| # Start in MULLIGAN phase | |
| game_state.phase = Phase.MULLIGAN_P1 | |
| def create_room_internal( | |
| room_id: str, | |
| mode: str = "pve", | |
| deck_type: str = "normal", | |
| public: bool = False, | |
| custom_decks: dict = None, | |
| ) -> dict[str, Any]: | |
| """Helper to initialize a room using the RUST engine.""" | |
| print( | |
| f"DEBUG: Creating Rust Room {room_id} (Mode: {mode}, Deck: {deck_type}, Public: {public}, CustomDecks: {bool(custom_decks)})" | |
| ) | |
| if RUST_DB is None: | |
| raise Exception("RUST_DB not initialized") | |
| gs = engine_rust.PyGameState(RUST_DB) | |
| # helper for deck generation | |
| def get_random_decks(): | |
| m_ids = list(member_db.keys()) | |
| l_ids = list(live_db.keys()) | |
| random.shuffle(m_ids) | |
| random.shuffle(l_ids) | |
| main_ids = [] | |
| for mid in m_ids[:15]: | |
| main_ids.extend([mid] * 4) | |
| for lid in l_ids[:4]: | |
| main_ids.extend([lid] * 4) | |
| random.shuffle(main_ids) | |
| e_deck = [list(energy_db.keys())[0]] * 12 if energy_db else [40000] * 12 | |
| l_deck = l_ids[ | |
| :3 | |
| ] # Actually the Rust engine treats lives as a separate param in some versions or part of deck? | |
| # Checked engine_rust/src/py_bindings.rs: initialize_game needs p0_deck, p1_deck, p0_energy, p1_energy, p0_lives, p1_lives | |
| return main_ids[:60], e_deck, l_ids[:3] | |
| # Defaults | |
| p0_m, p0_e, p0_l = get_random_decks() | |
| p1_m, p1_e, p1_l = get_random_decks() | |
| # Override with custom decks if provided | |
| final_custom_decks = {0: {"main": [], "energy": []}, 1: {"main": [], "energy": []}} | |
| if custom_decks: | |
| final_custom_decks.update(custom_decks) | |
| for pid in [0, 1]: | |
| cdeck = custom_decks.get(str(pid)) or custom_decks.get(pid) | |
| if cdeck and cdeck.get("main"): | |
| # Convert strings to IDs | |
| main_ids = convert_deck_strings_to_ids(cdeck["main"]) | |
| random.shuffle(main_ids) | |
| # Extract Live cards for the initial Live Zone (3 cards) | |
| # Note: cid is a UID, so we must mask it to compare with live_db keys | |
| live_ids = [cid for cid in main_ids if (cid & BASE_ID_MASK) in live_db] | |
| if len(main_ids) > 0: | |
| if pid == 0: | |
| p0_m = main_ids | |
| if len(live_ids) >= 3: | |
| p0_l = live_ids[:3] # Pick first 3 as starting lives | |
| elif len(live_ids) > 0: | |
| p0_l = live_ids # Use whatever lives are available | |
| else: | |
| p1_m = main_ids | |
| if len(live_ids) >= 3: | |
| p1_l = live_ids[:3] | |
| elif len(live_ids) > 0: | |
| p1_l = live_ids | |
| # Energy | |
| if cdeck.get("energy"): | |
| e_ids = convert_deck_strings_to_ids(cdeck["energy"]) | |
| if pid == 0: | |
| p0_e = e_ids | |
| else: | |
| p1_e = e_ids | |
| # Warning: We are not extracting initial lives from main deck for p0_l/p1_l if custom. | |
| # The engine probably draws them? | |
| # If `p0_l` is required, we should pick random 3 from lives in deck or DB? | |
| # For now, let's keep random lives for the Live Zone if not specified, or just reuse random ones. | |
| gs.initialize_game(p0_m, p1_m, p0_e, p1_e, p0_l, p1_l) | |
| return { | |
| "state": gs, | |
| "mode": mode, | |
| "public": public, | |
| "created_at": datetime.now(), | |
| "last_active": datetime.now(), | |
| "ai_agent": None, # MCTS is built-in | |
| "custom_decks": final_custom_decks, | |
| "sessions": {}, | |
| "engine": "rust", | |
| } | |
| def join_room_logic(room_id: str) -> dict[str, Any]: | |
| """ | |
| Logic to add a user session to a room. | |
| Returns {"session_id": str, "player_id": int} | |
| """ | |
| if room_id not in ROOMS: | |
| return {"error": "Room not found"} | |
| room = ROOMS[room_id] | |
| sessions = room["sessions"] | |
| # Simple assignment logic: | |
| # If 0 is free, take 0. | |
| # If 1 is free, take 1. | |
| # Else, maybe return spectator? For now, just return -1 or error. | |
| # Check current players | |
| taken_pids = set(sessions.values()) | |
| new_pid = -1 | |
| if 0 not in taken_pids: | |
| new_pid = 0 | |
| elif 1 not in taken_pids: | |
| new_pid = 1 | |
| else: | |
| # Both full. Spectator? | |
| new_pid = -1 | |
| # For spectator, maybe we still give a session but with pid -1? | |
| session_id = str(uuid.uuid4()) | |
| sessions[session_id] = new_pid | |
| return {"session_id": session_id, "player_id": new_pid} | |
| # --- ROOM MANAGEMENT API --- | |
| def create_new_room(): | |
| print("DEBUG: Entered create_new_room endpoint", file=sys.stderr) | |
| try: | |
| data = request.json or {} | |
| except Exception as e: | |
| print(f"DEBUG: Failed to parse JSON: {e}", file=sys.stderr) | |
| data = {} | |
| mode = data.get("mode", "pve") | |
| is_public = data.get("public", False) | |
| custom_decks = data.get("decks", None) # Optional initial decks | |
| # Generate 4-char code | |
| import string | |
| chars = string.ascii_uppercase + string.digits | |
| while True: | |
| room_id = "".join(random.choices(chars, k=4)) | |
| if room_id not in ROOMS: | |
| break | |
| print(f"DEBUG: Generated room_id {room_id}, acquiring lock...", file=sys.stderr) | |
| res = {} | |
| with game_lock: | |
| print("DEBUG: Lock acquired. Creating room internal...", file=sys.stderr) | |
| ROOMS[room_id] = create_room_internal(room_id, mode, public=is_public, custom_decks=custom_decks) | |
| print("DEBUG: Room created internally. Joining creator...", file=sys.stderr) | |
| # Auto-join creator | |
| join_res = join_room_logic(room_id) | |
| print("DEBUG: Returning response.", file=sys.stderr) | |
| return jsonify({"success": True, "room_id": room_id, "mode": mode, "session": join_res}) | |
| def list_public_rooms(): | |
| """Return a list of public rooms.""" | |
| public_rooms = [] | |
| with game_lock: | |
| for rid, room in ROOMS.items(): | |
| if room.get("public", False): | |
| # Calculate player count | |
| sessions = room.get("sessions", {}) | |
| player_count = len(set(sessions.values())) # Approximate, might need better logic if spectators exist | |
| # Or just count occupied slots (0 and 1) | |
| occupied_slots = 0 | |
| taken_pids = set(sessions.values()) | |
| if 0 in taken_pids: | |
| occupied_slots += 1 | |
| if 1 in taken_pids: | |
| occupied_slots += 1 | |
| # Basic Info | |
| gs = room.get("state") | |
| turn = gs.turn_number if gs else 0 | |
| phase = str(gs.phase) if gs else "?" | |
| public_rooms.append( | |
| { | |
| "room_id": rid, | |
| "mode": room.get("mode", "pve"), | |
| "players": occupied_slots, | |
| "turn": turn, | |
| "phase": phase, | |
| "created_at": room.get("created_at", datetime.now()).isoformat(), | |
| } | |
| ) | |
| # Sort by creation time desc | |
| public_rooms.sort(key=lambda x: x["created_at"], reverse=True) | |
| return jsonify({"success": True, "rooms": public_rooms}) | |
| def join_room(): | |
| print("DEBUG: Entered join_room", file=sys.stderr) | |
| data = request.json or {} | |
| room_id = data.get("room_id", "").upper().strip() | |
| print(f"DEBUG: Entered join_room for ID: '{room_id}'", file=sys.stderr) | |
| with game_lock: | |
| if room_id in ROOMS: | |
| mode = ROOMS[room_id]["mode"] | |
| print(f"DEBUG: Found room {room_id}, mode={mode}", file=sys.stderr) | |
| # Assign a session/seat to the joining player | |
| join_res = join_room_logic(room_id) | |
| if "error" in join_res: | |
| return jsonify({"success": False, "error": join_res["error"]}), 400 | |
| return jsonify( | |
| { | |
| "success": True, | |
| "room_id": room_id, | |
| "mode": mode, | |
| "session_id": join_res.get("session_id"), | |
| "player_id": join_res.get("player_id"), | |
| } | |
| ) | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| def index(): | |
| return send_from_directory(WEB_UI_DIR, "index.html") | |
| def game_board(): | |
| return send_from_directory(WEB_UI_DIR, "game_board.html") # Assuming it exists there | |
| def serve_js(filename): | |
| return send_from_directory(os.path.join(WEB_UI_DIR, "js"), filename) | |
| def serve_css(filename): | |
| return send_from_directory(os.path.join(WEB_UI_DIR, "css"), filename) | |
| def serve_icon(): | |
| # If icon is in root or img, adjust. Assuming img for now or checking existence. | |
| # Fallback to IMG_DIR or WEB_UI_DIR | |
| return send_from_directory(IMG_DIR, "icon_blade.png") | |
| def serve_deck_builder(): | |
| return send_from_directory(WEB_UI_DIR, "deck_builder.html") | |
| def serve_data(filename): | |
| return send_from_directory(DATA_DIR, filename) | |
| import threading | |
| import time | |
| # Threading setup | |
| game_lock = threading.RLock() # Re-entrant lock to prevent self-deadlock | |
| game_thread = None | |
| def background_game_loop(): | |
| """ | |
| Runs the game logic (AI and auto-phases) for ALL active rooms. | |
| """ | |
| print("Background Game Loop Started (Multi-Room)", file=sys.stderr) | |
| while True: | |
| try: | |
| # print("DEBUG: Background Loop acquiring lock...", file=sys.stderr) | |
| with game_lock: | |
| # Iterate over a copy of keys to avoid modification issues if needed | |
| active_room_ids = list(ROOMS.keys()) | |
| for rid in active_room_ids: | |
| # print(f"DEBUG: Processing room {rid}...", file=sys.stderr) | |
| room = ROOMS.get(rid) | |
| if not room: | |
| continue | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| ai_agent = room["ai_agent"] | |
| if not gs.is_terminal(): | |
| # 1. Auto-Advance Phases | |
| if gs.phase in ( | |
| Phase.ACTIVE, | |
| Phase.ENERGY, | |
| Phase.DRAW, | |
| Phase.PERFORMANCE_P1, | |
| Phase.PERFORMANCE_P2, | |
| ): | |
| # Safe attribute access for Rust engine compatibility | |
| p_choices = getattr(gs, "pending_choices", []) | |
| p_effects = getattr(gs, "pending_effects", []) | |
| if not (p_choices or p_effects): | |
| res = gs.step(0) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| elif gs.current_player == 1 and game_mode == "pve": | |
| is_continue_choice = False | |
| if gs.pending_choices and gs.pending_choices[0][0].startswith("CONTINUE"): | |
| is_continue_choice = True | |
| if gs.phase == Phase.LIVE_RESULT and is_continue_choice: | |
| # Wait for Human | |
| pass | |
| else: | |
| if gs.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2): | |
| aid = 0 | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| else: | |
| if room.get("engine") == "rust": | |
| # Use Greedy (1-ply) AI for Rust engine in PVE to maximize responsiveness | |
| gs.step_opponent_greedy() | |
| else: | |
| aid = ai_agent.choose_action(gs, 1) | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| time.sleep(0.1) | |
| except Exception as e: | |
| print(f"Error in game loop: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| time.sleep(1.0) | |
| def get_state(): | |
| room_id = get_room_id() | |
| session_token = request.headers.get("X-Session-Token") | |
| with game_lock: | |
| # Development convenience: Auto-create room if missing IF it's "SINGLE_PLAYER" | |
| if room_id == "SINGLE_PLAYER" and room_id not in ROOMS: | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found or expired"}), 404 | |
| gs = room["state"] | |
| mode = room["mode"] | |
| viewer_idx = get_player_idx() | |
| if room.get("engine") == "rust": | |
| s_state = rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=mode, is_pvp=(mode == "pvp")) | |
| else: | |
| s_state = serialize_state( | |
| gs, | |
| viewer_idx=viewer_idx, | |
| is_pvp=(mode == "pvp" and request.headers.get("X-Player-Idx") is None), | |
| mode=mode, | |
| ) | |
| # Meta info about decks | |
| cdecks = room.get("custom_decks", {}) | |
| meta = { | |
| "p0_deck_set": bool(cdecks.get(0, {}).get("main") or cdecks.get("0", {}).get("main")), | |
| "p1_deck_set": bool(cdecks.get(1, {}).get("main") or cdecks.get("1", {}).get("main")), | |
| "mode": mode, | |
| } | |
| return jsonify({"success": True, "state": s_state, "meta": meta}) | |
| def set_deck(): | |
| """Accept a custom deck for a player in a specific room.""" | |
| data = request.json | |
| player_id = data.get("player", 0) | |
| deck_ids = data.get("deck", []) # List of card_no strings | |
| energy_ids = data.get("energy_deck", []) | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| # For setting deck, we might want to allow it even if room doesn't exist yet? | |
| # But conceptually, you create a room, then set deck, then reset/start. | |
| if not room: | |
| # Auto-create for dev workflow | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| room = ROOMS[room_id] | |
| room["custom_decks"][player_id] = {"main": deck_ids, "energy": energy_ids} | |
| return jsonify( | |
| { | |
| "status": "ok", | |
| "player": player_id, | |
| "deck_size": len(deck_ids), | |
| "message": f"Deck set for Player {player_id + 1} in Room {room_id}. Reset game to apply.", | |
| } | |
| ) | |
| def upload_deck(): | |
| """Accept a raw deck file content (decktest.txt style) and load it.""" | |
| data = request.json | |
| content = data.get("content", "") | |
| player_id = data.get("player", 0) | |
| room_id = get_room_id() | |
| # Parse content | |
| try: | |
| if content.strip().startswith("{") or content.strip().startswith("["): | |
| # JSON format | |
| deck_data = json.loads(content) | |
| # Support both simple list and object | |
| if isinstance(deck_data, list): | |
| main_deck = deck_data | |
| energy_deck = [] # JSON list implies only main deck usually? | |
| elif "main" in deck_data: | |
| main_deck = deck_data["main"] | |
| energy_deck = deck_data.get("energy", []) | |
| else: | |
| return jsonify( | |
| {"success": False, "error": "Invalid JSON deck format. Expected list or object with 'main' key."} | |
| ) | |
| else: | |
| # HTML/Text format | |
| card_db = {} | |
| try: | |
| cards_path = os.path.join(DATA_DIR, "cards.json") | |
| with open(cards_path, "r", encoding="utf-8") as f: | |
| card_db = json.load(f) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": f"Failed to load card DB for validation: {e}"}) | |
| main_deck, energy_deck, _, errors = extract_deck_data(content, card_db) # Pass DB for validation | |
| if errors: | |
| return jsonify({"success": False, "error": "Validation Errors:\n" + "\n".join(errors)}) | |
| except json.JSONDecodeError: | |
| return jsonify({"success": False, "error": "Invalid JSON format."}) | |
| except Exception as e: | |
| print(f"Deck parsing error: {e}") | |
| return jsonify({"success": False, "error": str(e)}) | |
| if not main_deck and not energy_deck: | |
| return jsonify({"success": False, "error": "No cards found in file."}) | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| ROOMS[room_id] = create_room_internal(room_id) | |
| room = ROOMS[room_id] | |
| room["custom_decks"][player_id] = {"main": main_deck, "energy": energy_deck} | |
| # Auto-apply? | |
| # Re-init room with "custom" logic? | |
| # For now, let's just create a new room state using these decks immediately for convenience | |
| # But we need to respect the loop. | |
| # Actually existing logic calls init_game(deck_type="custom"). | |
| # We'll just trigger a reset logic manually | |
| # This duplicates logic in reset() but scoped to this room + custom deck applied. | |
| # For simplicity, we just store it. User must click "Reset" or we call reset internal? | |
| # The frontend usually expects upload to just work. | |
| pass | |
| # Trigger Reset via API logic simulation or just return success and let caller Reset? | |
| # Existing behavior: calls init_game("custom"). | |
| # So we should probably do the same: reset the room's state using these custom decks. | |
| # We can reuse the create_room_internal logic if we modify it to accept custom decks directly? | |
| # Or just rely on the room["custom_decks"] being set. | |
| # Let's call reset internal logic here? | |
| # Better: Update endpoints first, then we can verify flow. | |
| # For now, we assume user clicks Reset or we simulate it. | |
| # Actually, let's just return success. The frontend typically reloads or resets. | |
| return jsonify( | |
| { | |
| "success": True, | |
| "main_count": len(main_deck), | |
| "energy_count": len(energy_deck), | |
| "room_id": room_id, | |
| "message": f"Deck Loaded! ({len(main_deck)} Main, {len(energy_deck)} Energy). Please Reset.", | |
| } | |
| ) | |
| def get_test_deck_api(): | |
| """Read deck files from ai/decks/ directory and return card list.""" | |
| from engine.game.deck_utils import extract_deck_data | |
| deck_name = request.args.get("deck", "") # Optional deck name parameter | |
| # Path to ai/decks directory | |
| # Use PROJECT_ROOT for reliability | |
| ai_decks_dir = os.path.join(PROJECT_ROOT, "ai", "decks") | |
| if not os.path.exists(ai_decks_dir): | |
| # Fallback: try CWD relative | |
| ai_decks_dir = os.path.abspath(os.path.join("ai", "decks")) | |
| if not os.path.exists(ai_decks_dir): | |
| return jsonify({"success": False, "error": "ai/decks directory not found"}) | |
| # List available decks (excluding verify script) | |
| available_decks = [] | |
| for f in os.listdir(ai_decks_dir): | |
| if f.endswith(".txt") and not f.startswith("verify"): | |
| available_decks.append(f.replace(".txt", "")) | |
| # If no deck specified, return list of available decks | |
| if not deck_name: | |
| # Default to aqours_cup for "Load Test Deck" button compatibility | |
| deck_name = "aqours_cup" | |
| message = "Defaulting to 'aqours_cup'. Specify ?deck=NAME to load a specific deck." | |
| else: | |
| message = f"Loaded '{deck_name}'" | |
| # Find matching deck file | |
| deck_file = os.path.join(ai_decks_dir, f"{deck_name}.txt") | |
| if not os.path.exists(deck_file): | |
| return jsonify({"success": False, "error": f"Deck '{deck_name}' not found", "available_decks": available_decks}) | |
| try: | |
| with open(deck_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Load card DB for parsing | |
| card_db_path = os.path.join(CURRENT_DIR, "..", "data", "cards.json") | |
| card_db = {} | |
| if os.path.exists(card_db_path): | |
| with open(card_db_path, "r", encoding="utf-8") as f_db: | |
| card_db = json.load(f_db) | |
| # Use the unified parser | |
| main_deck, energy_deck, type_counts, errors = extract_deck_data(content, card_db) | |
| return jsonify( | |
| { | |
| "success": True, | |
| "deck_name": deck_name, | |
| "content": main_deck, # For compatibility with older frontend | |
| "main_deck": main_deck, | |
| "energy_deck": energy_deck, | |
| "available_decks": available_decks, | |
| "message": f"{message} ({len(main_deck)} Main, {len(energy_deck)} Energy)", | |
| "errors": errors, | |
| } | |
| ) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def validate_cards(): | |
| """Validate card IDs against the database and provide type breakdown.""" | |
| data = request.json | |
| card_ids = data.get("card_ids", []) | |
| card_counts = data.get("card_counts", {}) # Optional: {card_id: quantity} | |
| # Ensure mapping is built | |
| if not card_no_to_id: | |
| print("DEBUG: validation - mapping empty, rebuilding...", flush=True) | |
| build_card_no_mapping() | |
| print(f"DEBUG: validation - map size: {len(card_no_to_id)}", flush=True) | |
| test_key = "PL!SP-bp1-004-R" | |
| if test_key in card_no_to_id: | |
| print(f"DEBUG: validation - found {test_key}: {card_no_to_id[test_key]}", flush=True) | |
| else: | |
| print(f"DEBUG: validation - {test_key} NOT FOUND in map!", flush=True) | |
| known = [] | |
| unknown = [] | |
| card_info = {} # card_id -> {type, name, internal_id} | |
| # Type counters | |
| member_count = 0 | |
| live_count = 0 | |
| energy_count = 0 | |
| for card_id in card_ids: | |
| # print(f"DEBUG: Checking {card_id}", flush=True) | |
| qty = card_counts.get(card_id, 1) | |
| if card_id in card_no_to_id: | |
| internal_id = card_no_to_id[card_id] | |
| known.append(card_id) | |
| # Determine type and get name | |
| if internal_id in member_db: | |
| card_info[card_id] = {"type": "Member", "name": member_db[internal_id].name} | |
| member_count += qty | |
| elif internal_id in live_db: | |
| card_info[card_id] = {"type": "Live", "name": live_db[internal_id].name} | |
| live_count += qty | |
| elif internal_id in energy_db: | |
| card_info[card_id] = {"type": "Energy", "name": energy_db[internal_id].name} | |
| energy_count += qty | |
| else: | |
| unknown.append(card_id) | |
| debug_info = { | |
| "map_size": len(card_no_to_id), | |
| "test_key_exists": "PL!SP-bp1-004-R" in card_no_to_id, | |
| "test_key_val": card_no_to_id.get("PL!SP-bp1-004-R", "N/A"), | |
| "first_5_keys": list(card_no_to_id.keys())[:5], | |
| } | |
| return jsonify( | |
| { | |
| "known": known, | |
| "unknown": unknown, | |
| "known_count": len(known), | |
| "unknown_count": len(unknown), | |
| "card_info": card_info, | |
| "breakdown": {"member": member_count, "live": live_count, "energy": energy_count}, | |
| "_debug": debug_info, | |
| } | |
| ) | |
| def clear_performance(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if room: | |
| gs = room["state"] | |
| # Clear the results dictionary | |
| gs.performance_results.clear() | |
| return jsonify({"status": "ok"}) | |
| def do_action(): | |
| room_id = get_room_id() | |
| session_token = request.headers.get("X-Session-Token") | |
| with game_lock: | |
| start_time = time.time() | |
| try: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| ai_agent = room["ai_agent"] | |
| sessions = room.get("sessions", {}) | |
| # Session Validation (Enforce Turn) | |
| if session_token and session_token in sessions: | |
| pid = sessions[session_token] | |
| if pid != -1: | |
| # Check Pending Choice Turn | |
| p_choices = getattr(gs, "pending_choices", []) | |
| if p_choices: | |
| # Handle both Rust (str, str) and Python (str, dict) formats | |
| params = p_choices[0][1] | |
| if isinstance(params, str): | |
| # Rust format: parse JSON | |
| try: | |
| params = json.loads(params) | |
| except: | |
| params = {} | |
| choice_pid = params.get("player_id", gs.current_player) | |
| if choice_pid != pid: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn to choose (Waiting for P{choice_pid})"} | |
| ), 403 | |
| # Check Main Turn | |
| elif gs.current_player != pid: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn (Waiting for P{gs.current_player})"} | |
| ), 403 | |
| data = request.json | |
| action_id = data.get("action_id", 0) | |
| force = data.get("force", False) | |
| legal_mask = gs.get_legal_actions() | |
| # Validate Action | |
| if not (0 <= action_id < len(legal_mask)): | |
| return jsonify({"success": False, "error": "Invalid action ID"}), 400 | |
| # Enforce Perspective/Active Player consistency in PvP | |
| requester_idx = get_player_idx() | |
| if game_mode == "pvp": | |
| if requester_idx != gs.current_player: | |
| return jsonify( | |
| {"success": False, "error": f"Not your turn! It's P{gs.current_player + 1}'s turn."} | |
| ), 403 | |
| elif game_mode == "pve": | |
| # In PvE, if it's AI turn (P1), don't allow manual action from UI | |
| if gs.current_player == 1: | |
| return jsonify({"success": False, "error": "AI is playing, please wait."}), 403 | |
| is_legal = legal_mask[action_id] | |
| if force or is_legal: | |
| # Step 1: Execute User Action | |
| res = gs.step(action_id) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| # Step 2: Auto-Advance & AI Handling | |
| max_safety = 50 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| # A. Automatic Phases (-2=Setup, 1=Active, 2=Energy, 3=Draw, 6=Perf1, 7=Perf2, 8=LiveResult) | |
| if gs.phase in (-2, 1, 2, 3, 6, 7, 8): | |
| res = gs.step(0) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| continue | |
| # B. AI Turn (P1) - ONLY if PVE | |
| if gs.current_player == 1 and game_mode == "pve": | |
| if room.get("engine") == "rust": | |
| gs.step_opponent_mcts(10) | |
| else: | |
| # Python AI | |
| aid = ai_agent.choose_action(gs, 1) | |
| res = gs.step(aid) | |
| if res is not None: | |
| room["state"] = res | |
| gs = res | |
| continue | |
| break | |
| viewer_idx = get_player_idx() | |
| duration = time.time() - start_time | |
| print(f"[PERF] /api/action took {duration:.3f}s (Action: {action_id})") | |
| return jsonify( | |
| { | |
| "success": True, | |
| "state": rust_serializer.serialize_state(gs, viewer_idx=viewer_idx, mode=game_mode), | |
| } | |
| ) | |
| else: | |
| return jsonify({"success": False, "error": f"Illegal action {action_id}"}), 400 | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| # Auto-report issue on crash | |
| try: | |
| report_dir = os.path.join(CURRENT_DIR, "reports") | |
| os.makedirs(report_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| crash_file = os.path.join(report_dir, f"crash_{timestamp}.json") | |
| try: | |
| if room is not None and room.get("engine") == "rust": | |
| serialized_state = rust_serializer.serialize_state( | |
| gs, viewer_idx=get_player_idx(), mode=game_mode | |
| ) | |
| else: | |
| serialized_state = serialize_state( | |
| gs, viewer_idx=get_player_idx(), is_pvp=(game_mode == "pvp"), mode=game_mode | |
| ) | |
| with open(crash_file, "w", encoding="utf-8") as f: | |
| # Use app.json.dumps to handle Numpy types | |
| f.write( | |
| app.json.dumps( | |
| { | |
| "error": str(e), | |
| "trace": traceback.format_exc(), | |
| "state": serialized_state, | |
| } | |
| ) | |
| ) | |
| except Exception as inner_e: | |
| # Fallback if serialization fails | |
| with open(crash_file, "w", encoding="utf-8") as f: | |
| f.write( | |
| app.json.dumps( | |
| {"error": str(e), "trace": traceback.format_exc(), "serialization_error": str(inner_e)} | |
| ) | |
| ) | |
| except: | |
| pass | |
| return jsonify({"success": False, "error": str(e), "trace": traceback.format_exc()}), 500 | |
| def index_route(): | |
| return send_from_directory(app.static_folder, "index.html") | |
| def static_proxy(path): | |
| return send_from_directory(app.static_folder, path) | |
| def god_mode(): | |
| room_id = get_room_id() | |
| code = request.json.get("code", "") | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}) | |
| gs = room["state"] | |
| try: | |
| p = gs.active_player | |
| exec(code, {"state": gs, "p": p, "np": np}) | |
| return jsonify({"success": True, "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"))}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def reset(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| data = request.json or {} | |
| deck_type = data.get("deck_type", "normal") | |
| # Allow changing mode on reset | |
| new_mode = data.get("mode") # Optional | |
| # Check if room exists to preserve existing params if not specified | |
| old_room = ROOMS.get(room_id) | |
| mode = new_mode if new_mode else (old_room["mode"] if old_room else "pve") | |
| ROOMS[room_id] = create_room_internal(room_id, mode, deck_type) | |
| room = ROOMS[room_id] | |
| # Check for custom decks and apply them if they exist for this room | |
| if old_room and "custom_decks" in old_room: | |
| room["custom_decks"] = old_room["custom_decks"] | |
| # Preserve sessions | |
| if old_room and "sessions" in old_room: | |
| room["sessions"] = old_room["sessions"] | |
| if deck_type == "custom": | |
| # Apply custom decks to the fresh state | |
| gs = room["state"] | |
| for pid in [0, 1]: | |
| cdeck = room["custom_decks"].get(pid) | |
| if cdeck and cdeck["main"]: | |
| gs.players[pid].main_deck = convert_deck_strings_to_ids(cdeck["main"]) | |
| random.shuffle(gs.players[pid].main_deck) | |
| # Re-draw hand? | |
| gs.players[pid].hand = [] | |
| gs.players[pid].hand_added_turn = [] | |
| for _ in range(6): | |
| if gs.players[pid].main_deck: | |
| gs.players[pid].hand.append(gs.players[pid].main_deck.pop()) | |
| gs.players[pid].hand_added_turn.append(0) | |
| if cdeck and cdeck["energy"]: | |
| # Re-fill energy | |
| gs.players[pid].energy_deck = convert_deck_strings_to_ids(cdeck["energy"]) | |
| gs.players[pid].energy_zone = [] | |
| for _ in range(3): | |
| if gs.players[pid].energy_deck: | |
| gs.players[pid].energy_zone.append(gs.players[pid].energy_deck.pop(0)) | |
| gs = room["state"] | |
| game_mode = room["mode"] | |
| # Auto-advance (AI goes first or Init steps) | |
| max_safety = 100 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| # Automatic phases | |
| if gs.phase in (-2, 1, 2, 3, 6, 7, 8): | |
| gs.step(0) | |
| continue | |
| # AI Turn (P1) | |
| if gs.current_player == 1 and game_mode == "pve": | |
| gs.step_opponent_mcts(10) | |
| continue | |
| break # P0 turn or user input needed | |
| return jsonify({"success": True, "state": rust_serializer.serialize_state(gs, mode=game_mode)}) | |
| def ai_suggest(): | |
| room_id = get_room_id() | |
| data = request.json or {} | |
| sims = data.get("sims", 10) | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"error": "Room not found"}), 404 | |
| gs = room["state"] | |
| # Only run if not terminal | |
| if gs.is_terminal(): | |
| return jsonify({"suggestions": []}) | |
| stats = gs.get_mcts_suggestions(sims) | |
| # Shim for get_action_desc | |
| class RustShim: | |
| def __init__(self, gs): | |
| self.phase = gs.phase | |
| self.current_player = gs.current_player | |
| self.active_player = gs.get_player(gs.current_player) | |
| self.member_db = member_db | |
| self.live_db = live_db | |
| self.pending_choices = [] # TODO: expose from rust if needed | |
| shim = RustShim(gs) | |
| # Enrich stats with descriptions | |
| enriched = [] | |
| for action, value, visits in stats: | |
| desc = get_action_desc(action, shim) | |
| enriched.append({"action_id": action, "value": float(value), "visits": int(visits), "desc": desc}) | |
| return jsonify({"success": True, "suggestions": enriched}) | |
| def list_replays(): | |
| # 1. Root replays | |
| try: | |
| if os.path.exists("replays"): | |
| for f in os.listdir("replays"): | |
| if f.endswith(".json") and os.path.isfile(os.path.join("replays", f)): | |
| replays.append({"filename": f, "folder": ""}) | |
| # 2. Tournament subfolder | |
| tourney_dir = os.path.join("replays", "tournament") | |
| if os.path.exists(tourney_dir): | |
| for f in os.listdir(tourney_dir): | |
| if f.endswith(".json"): | |
| # We need to handle pathing. The frontend might expect just filename. | |
| # But get_replay takes "filename". | |
| # We should probably update get_replay to handle subpaths or encode it. | |
| # For now let's just use the relative path as the filename | |
| replays.append({"filename": f"tournament/{f}", "folder": "tournament"}) | |
| except Exception as e: | |
| print(f"Error listing replays: {e}") | |
| return jsonify({"success": False, "error": str(e)}) | |
| # Sort by filename desc (usually timestamp) | |
| replays.sort(key=lambda x: x["filename"], reverse=True) | |
| return jsonify({"success": True, "replays": replays}) | |
| def get_replay(filename): | |
| """Serve replay JSON files""" | |
| replay_path = f"replays/{filename}" | |
| if os.path.exists(replay_path): | |
| with open(replay_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Auto-inflate if it's an optimized replay | |
| if "registry" in data and "states" in data: | |
| print(f"Inflating optimized replay: {filename}") | |
| inflated_states = inflate_history(data, member_db, live_db, energy_db) | |
| # Reconstruct standard format | |
| data["states"] = inflated_states | |
| # Remove registry to avoid confusing frontend if it doesn't expect it | |
| del data["registry"] | |
| return jsonify(data) | |
| return jsonify({"error": "Replay not found"}), 404 | |
| def advance(): | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}), 404 | |
| gs = room["state"] | |
| ai_agent = room["ai_agent"] | |
| # Run auto-advance loop | |
| max_safety = 50 | |
| while not gs.is_terminal() and max_safety > 0: | |
| max_safety -= 1 | |
| # Advance if in an automatic phase (AND no choices pending) | |
| if not gs.pending_choices and gs.phase in ( | |
| Phase.ACTIVE, | |
| Phase.ENERGY, | |
| Phase.DRAW, | |
| Phase.PERFORMANCE_P1, | |
| Phase.PERFORMANCE_P2, | |
| ): | |
| gs = gs.step(0) | |
| room["state"] = gs | |
| continue | |
| # Determine who should act (Check pending choices first) | |
| next_actor = gs.current_player | |
| if gs.pending_choices: | |
| # Handle both Rust (str, str) and Python (str, dict) formats | |
| params = gs.pending_choices[0][1] | |
| if isinstance(params, str): | |
| try: | |
| params = json.loads(params) | |
| except: | |
| params = {} | |
| next_actor = params.get("player_id", gs.current_player) | |
| # If it's the AI's turn (P1) or the AI has a pending choice, let it act immediately | |
| if next_actor == 1 and not gs.is_terminal(): | |
| aid = ai_agent.choose_action(gs, 1) | |
| gs = gs.step(aid) | |
| room["state"] = gs | |
| continue | |
| break | |
| return jsonify( | |
| { | |
| "success": True, | |
| "state": serialize_state(gs, is_pvp=(room["mode"] == "pvp"), mode=room["mode"]), | |
| } | |
| ) | |
| def get_full_log(): | |
| """Return the complete rule log without truncation.""" | |
| room_id = get_room_id() | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"log": [], "total_entries": 0}) | |
| gs = room["state"] | |
| return jsonify({"log": gs.rule_log, "total_entries": len(gs.rule_log)}) | |
| def set_ai(): | |
| room_id = get_room_id() | |
| data = request.json | |
| mode = data.get("ai_mode", "smart") | |
| with game_lock: | |
| room = get_room(room_id) | |
| if not room: | |
| return jsonify({"success": False, "error": "Room not found"}) | |
| if mode == "random": | |
| room["ai_agent"] = RandomAgent() | |
| elif mode == "smart": | |
| room["ai_agent"] = SmartAgent() | |
| else: | |
| return jsonify({"success": False, "error": f"Unknown AI mode: {mode}"}) | |
| return jsonify({"success": True, "mode": mode}) | |
| def report_issue(): | |
| """Save the current game state and user explanation to a report file.""" | |
| try: | |
| room_id = get_room_id() | |
| room = get_room(room_id) | |
| gs = room["state"] if room else None | |
| data = request.json | |
| explanation = data.get("explanation", "") | |
| # We can take the current state from the request or just use our global game_state | |
| # Providing it in the request is safer if the user is looking at a specific frame (e.g. in replay mode) | |
| # But for now, let's use the provided state if it exists, otherwise capture the current one. | |
| if room and room.get("engine") == "rust": | |
| serialized = rust_serializer.serialize_state(gs, viewer_idx=0, mode=room.get("mode", "pve")) | |
| else: | |
| serialized = serialize_state(gs, is_pvp=(room["mode"] == "pvp" if room else False)) | |
| state_to_save = data.get("state") or serialized | |
| history = data.get("history", []) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| os.makedirs("reports", exist_ok=True) | |
| filename = f"reports/report_{timestamp}.json" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump( | |
| { | |
| "timestamp": timestamp, | |
| "explanation": explanation, | |
| "state": state_to_save, | |
| "history": history, | |
| "performance_history": state_to_save.get("performance_history", []), | |
| "performance_results": state_to_save.get("performance_results", {}), | |
| "action_desc": get_action_desc(state_to_save.get("last_action", 0), gs) | |
| if gs and "last_action" in state_to_save | |
| else "N/A", | |
| }, | |
| f, | |
| indent=2, | |
| ensure_ascii=False, | |
| ) | |
| return jsonify({"success": True, "filename": filename}) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": str(e)}), 500 | |
| def generate_random_deck_list(member_db, live_db) -> list[str]: | |
| "Generate a valid random deck list (card_no strings)." | |
| deck = [] | |
| # 1. Select Members (48) | |
| available_members = [c.card_no for c in member_db.values()] | |
| if available_members: | |
| member_bucket = [] | |
| for m_no in available_members: | |
| member_bucket.extend([m_no] * 4) | |
| random.shuffle(member_bucket) | |
| while len(member_bucket) < 48: | |
| member_bucket.extend(available_members) | |
| deck.extend(member_bucket[:48]) | |
| # 2. Select Lives (12) | |
| available_lives = [c.card_no for c in live_db.values()] | |
| if available_lives: | |
| live_bucket = [] | |
| for l_no in available_lives: | |
| live_bucket.extend([l_no] * 4) | |
| random.shuffle(live_bucket) | |
| while len(live_bucket) < 12: | |
| live_bucket.extend(available_lives) | |
| deck.extend(live_bucket[:12]) | |
| return deck | |
| def get_random_deck_api(): | |
| global member_db, live_db | |
| deck_list = generate_random_deck_list(member_db, live_db) | |
| return jsonify( | |
| {"success": True, "content": deck_list, "message": f"Generated Random Deck ({len(deck_list)} cards)"} | |
| ) | |
| def get_presets(): | |
| """Return list of preset decks from tests/presets.json.""" | |
| try: | |
| preset_path = os.path.join(CURRENT_DIR, "..", "tests", "presets.json") | |
| if os.path.exists(preset_path): | |
| with open(preset_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return jsonify({"success": True, "presets": data}) | |
| return jsonify({"success": False, "error": "presets.json not found", "presets": []}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| if __name__ == "__main__": | |
| # PyInstaller Bundle Check | |
| if getattr(sys, "frozen", False): | |
| # If frozen, we might need to adjust static folder or templates folder depending on how flask finds them. | |
| # However, we added paths with --add-data, so they should be in sys._MEIPASS. | |
| # Flask's root_path defaults to __main__ directory, which in onefile mode is temporary. | |
| # We need to explicitly point static_folder to the MEIPASS location. | |
| bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore | |
| app.static_folder = os.path.join(bundle_dir, "web_ui") | |
| # app.template_folder = os.path.join(bundle_dir, 'templates') # if we used templates | |
| # Also need to make sure data loader finds 'data/cards.json' | |
| # CardDataLoader expects relative path. We might need to chdir or patch it. | |
| # Easiest is to chdir to the bundle dir so relative paths work? | |
| # BUT 'replays' need to be written to writable cwd, not temp dir. | |
| # So we should NOT chdir globally. | |
| # Instead, we should update filenames to be absolute paths based on bundle_dir if read-only. | |
| # Monkey patch the loader path just for this instance if needed, | |
| # but CardDataLoader takes a path arg. | |
| # We need to ensure 'init_game' calls it with the correct absolute path. | |
| pass | |
| # Patched init_game for Frozen state to find data | |
| original_init_game = init_game | |
| def frozen_init_game(deck_type="normal"): | |
| if getattr(sys, "frozen", False): | |
| bundle_dir = getattr(sys, "_MEIPASS", ".") # type: ignore | |
| os.path.join(bundle_dir, "data", "cards.json") | |
| # We need to temporarily force the loader to use this path | |
| # But init_game hardcodes "data/cards.json" in correct logic? | |
| # actually checking init_game source: | |
| # loader = CardDataLoader("data/cards.json") | |
| # We need to change that line or intercept. | |
| # Use os.chdir to temp dir for READS? No, we need writes to real dir. | |
| # Best way: Just ensure data/cards.json exists in CWD? No, user won't have it. | |
| # HACK: We can't easily change the hardcoded string inside init_game without rewriting it. | |
| # However, we can patch CardDataLoader class to fix the path! | |
| # Assuming CardDataLoader is imported from engine.game.data_loader | |
| from engine.game.data_loader import CardDataLoader | |
| ops_init = CardDataLoader.__init__ | |
| def new_init(self, filepath): | |
| if not os.path.exists(filepath) and getattr(sys, "frozen", False): | |
| # Try bundle path | |
| bundle_path = os.path.join(sys._MEIPASS, filepath) # type: ignore | |
| if os.path.exists(bundle_path): | |
| filepath = bundle_path | |
| ops_init(self, filepath) | |
| CardDataLoader.__init__ = new_init # type: ignore[method-assign] | |
| original_init_game(deck_type) | |
| init_game = frozen_init_game | |
| # Run Server | |
| # use_reloader=False is crucial for PyInstaller to implicit avoid spawning subprocesses incorrectly | |
| port = int(os.environ.get("PORT", 8000)) | |
| # Auto-open browser | |
| import webbrowser | |
| from threading import Timer | |
| def open_browser(): | |
| webbrowser.open_new(f"http://localhost:{port}/") | |
| if not getattr(sys, "frozen", False) or os.environ.get("OPEN_BROWSER", "true").lower() == "true": | |
| Timer(1.5, open_browser).start() | |
| # Start Background Game Loop | |
| if game_thread is None: | |
| game_thread = threading.Thread(target=background_game_loop, daemon=True) | |
| game_thread.start() | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| # In production/container, usually don't want debug mode | |
| debug_mode = os.environ.get("FLASK_DEBUG", "True").lower() == "true" | |
| app.run(host="0.0.0.0", port=port, debug=debug_mode) | |