Spaces:
Running
Running
| import threading | |
| import sys | |
| from pathlib import Path | |
| import datetime | |
| # --- Global State --- | |
| USER_LOGS = {} | |
| USER_PROGRESS = {} | |
| LOCK = threading.Lock() | |
| # --- Configuration Constants --- | |
| TEMP_DIR = Path("/tmp") | |
| TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
| # --- Helper Functions for State --- | |
| def get_progress(uid): | |
| with LOCK: | |
| return USER_PROGRESS.get(uid, {"percent": 0, "text": "System Idle", "status": "idle"}) | |
| def update_progress(uid, percent, text, status): | |
| with LOCK: | |
| USER_PROGRESS[uid] = {"percent": percent, "text": text, "status": status} | |
| def get_user_temp_dir(uid) -> Path: | |
| """Creates and returns a specific directory for the logged-in user.""" | |
| user_dir = TEMP_DIR / uid | |
| user_dir.mkdir(parents=True, exist_ok=True) | |
| return user_dir | |
| # --- Log Capture System --- | |
| class LogCatcher: | |
| """ | |
| Redirects stdout. Detects which user triggered the log based on | |
| the current thread name (which we will set to the User ID). | |
| """ | |
| def __init__(self, original_stream): | |
| self.terminal = original_stream | |
| def write(self, msg): | |
| self.terminal.write(msg) # Keep server logs visible | |
| if msg and msg.strip(): | |
| # Identify user by thread name (set in run_background_task) | |
| thread_name = threading.current_thread().name | |
| # Only capture logs for worker threads named "user_..." | |
| if thread_name.startswith("user_"): | |
| uid = thread_name.replace("user_", "") | |
| with LOCK: | |
| if uid not in USER_LOGS: | |
| USER_LOGS[uid] = [] | |
| USER_LOGS[uid].append(msg) | |
| if len(USER_LOGS[uid]) > 500: | |
| USER_LOGS[uid].pop(0) | |
| # Update progress bars based on keywords | |
| text = msg.lower() | |
| if "scanning coingecko" in text: | |
| update_progress(uid, 10, "Fetching CoinGecko Data...", "active") | |
| elif "scanning livecoinwatch" in text: | |
| update_progress(uid, 30, "Fetching LiveCoinWatch...", "active") | |
| elif "parsing spot file" in text: | |
| update_progress(uid, 50, "Analyzing Spot Volumes...", "active") | |
| elif "parsing futures pdf" in text: | |
| update_progress(uid, 70, "Parsing Futures PDF...", "active") | |
| elif "converting to pdf" in text: | |
| update_progress(uid, 90, "Compiling Report...", "active") | |
| elif "completed" in text or "pdf saved" in text: | |
| update_progress(uid, 100, "Task Completed Successfully", "success") | |
| elif "error" in text: | |
| update_progress(uid, 0, "Error Occurred", "error") | |
| def flush(self): | |
| self.terminal.flush() | |
| # Apply the LogCatcher immediately when this module is imported | |
| sys.stdout = LogCatcher(sys.stdout) |