Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import os | |
| import pickle | |
| import uuid | |
| import json | |
| from datetime import datetime | |
| import threading | |
| # 🛑 NO HEAVY IMPORTS AT TOP LEVEL | |
| # We import them inside functions to prevent "Memory Limit" crashes on startup. | |
| app = Flask(__name__) | |
| # Global Cache | |
| model_cache = { | |
| "lucid": None, | |
| "mouse": None, | |
| "fusion": None, | |
| "loaded": False, | |
| "error": None, | |
| "logs": [] | |
| } | |
| # --- CONFIGURATION --- | |
| # We write to /tmp because the root folder is Read-Only on HF Spaces | |
| LOG_FILE_PATH = "/tmp/predictions.log" | |
| # ------------------ LOGGING HELPERS ------------------ | |
| def log_prediction(req_id, payload, output): | |
| """Safely logs predictions to a temp file.""" | |
| try: | |
| record = { | |
| "request_id": req_id, | |
| "time": datetime.utcnow().isoformat(), | |
| "input": payload, | |
| "output": output | |
| } | |
| with open(LOG_FILE_PATH, "a") as f: | |
| f.write(json.dumps(record) + "\n") | |
| except Exception as e: | |
| print(f"⚠️ LOGGING FAILED (Non-Fatal): {e}") | |
| def log_feedback(feedback): | |
| """Safely logs user feedback.""" | |
| try: | |
| feedback["time"] = datetime.utcnow().isoformat() | |
| with open(LOG_FILE_PATH, 'a') as f: | |
| f.write(json.dumps(feedback) + "\n") | |
| except Exception as e: | |
| print(f"⚠️ FEEDBACK LOGGING FAILED: {e}") | |
| # ------------------ MODEL LOADING ------------------ | |
| def load_heavy_brains(): | |
| if model_cache["loaded"]: | |
| return model_cache["logs"] | |
| log = [] | |
| try: | |
| log.append("⏳ Importing TensorFlow...") | |
| import tensorflow as tf | |
| log.append("✅ TensorFlow Imported") | |
| log.append("⏳ Importing XGBoost...") | |
| import xgboost as xgb | |
| log.append("✅ XGBoost Imported") | |
| # Define Architecture locally | |
| Sequential = tf.keras.models.Sequential | |
| Input = tf.keras.layers.Input | |
| LSTM = tf.keras.layers.LSTM | |
| Dense = tf.keras.layers.Dense | |
| Dropout = tf.keras.layers.Dropout | |
| BatchNormalization = tf.keras.layers.BatchNormalization | |
| LeakyReLU = tf.keras.layers.LeakyReLU | |
| # Load LUCID | |
| if os.path.exists("lucid_cnn.h5"): | |
| model_cache["lucid"] = tf.keras.models.load_model("lucid_cnn.h5") | |
| log.append("✅ LUCID Model Loaded") | |
| else: | |
| log.append("⚠️ lucid_cnn.h5 missing") | |
| # Load MOUSE | |
| if os.path.exists("delbot_rnn.h5"): | |
| mouse_model = Sequential([ | |
| Input(shape=(None, 10)), | |
| LSTM(128, return_sequences=True), | |
| BatchNormalization(), | |
| LeakyReLU(alpha=0.1), | |
| Dropout(0.3), | |
| LSTM(64), | |
| LeakyReLU(alpha=0.1), | |
| Dropout(0.1), | |
| Dense(2, activation='softmax') | |
| ]) | |
| mouse_model.load_weights("delbot_rnn.h5") | |
| model_cache["mouse"] = mouse_model | |
| log.append("✅ Mouse Model Loaded") | |
| else: | |
| log.append("⚠️ delbot_rnn.h5 missing") | |
| # Load FUSION | |
| if os.path.exists("fusion_xgboost.pkl"): | |
| with open("fusion_xgboost.pkl", "rb") as f: | |
| model_cache["fusion"] = pickle.load(f) | |
| log.append("✅ Fusion Model Loaded") | |
| else: | |
| log.append("⚠️ fusion_xgboost.pkl missing") | |
| model_cache["loaded"] = True | |
| model_cache["logs"] = log | |
| return log | |
| except Exception as e: | |
| err = f"❌ CRITICAL LOAD ERROR: {str(e)}" | |
| print(err) | |
| model_cache["error"] = err | |
| return log + [err] | |
| # ------------------ DATA PROCESSING ------------------ | |
| def process_mouse_data(trace): | |
| try: | |
| import numpy as np | |
| MAX_STEPS = 60 | |
| if not trace or len(trace) < 2: | |
| return None | |
| vectors = [] | |
| for i in range(1, len(trace)): | |
| dt = (trace[i]['t'] - trace[i-1]['t']) or 1 | |
| dx = trace[i]['x'] - trace[i-1]['x'] | |
| dy = trace[i]['y'] - trace[i-1]['y'] | |
| angle = np.arctan2(dy, dx) | |
| vectors.append([dx, dy, dt, dx/dt, dy/dt, angle, 0.0, 0.0, 0.0, 0.0]) | |
| data = np.array(vectors) | |
| if len(data) > MAX_STEPS: | |
| data = data[:MAX_STEPS] | |
| else: | |
| data = np.vstack([data, np.zeros((MAX_STEPS - len(data), 10))]) | |
| return np.expand_dims(data, axis=0) | |
| except: | |
| return None | |
| # ------------------ ROUTES ------------------ | |
| def home(): | |
| return "<h3>Bot Detection Server</h3>Status: 🟢 Running" | |
| def detect(): | |
| req_id = str(uuid.uuid4()) | |
| # 1. Load Brains (Lazy) | |
| load_logs = load_heavy_brains() | |
| if model_cache["error"]: | |
| return jsonify({"success": False, "error": model_cache["error"]}) | |
| try: | |
| # CRITICAL FIX: Import numpy HERE so it exists even if 'lucid' block is skipped | |
| import numpy as np | |
| data = request.json or {} | |
| botd = float(data.get("botd_score", 0.0)) | |
| mouse_trace = data.get("mouse_trace", []) | |
| ts = data.get("request_timestamps", []) | |
| mouse_score = None | |
| net_score = 0.0 | |
| # A. Mouse Prediction | |
| if model_cache["mouse"]: | |
| inp = process_mouse_data(mouse_trace) | |
| if inp is not None: | |
| raw_mouse = model_cache["mouse"].predict(inp, verbose=0)[0][1] | |
| mouse_score = float(raw_mouse) | |
| # B. Net Prediction | |
| if model_cache["lucid"] and len(ts) > 2: | |
| iat = np.diff(sorted(ts))[:10] / 1000.0 | |
| mat = np.zeros((1, 10, 11, 1)) | |
| l = min(len(iat), 10) | |
| mat[0, :l, 0, 0] = iat[:l] | |
| raw_net = model_cache["lucid"].predict(mat, verbose=0)[0][0] | |
| net_score = float(raw_net) | |
| # C. Fusion Prediction | |
| safe_mouse = mouse_score if mouse_score is not None else 0.5 | |
| features = [botd, safe_mouse, net_score] | |
| final_prob = max(features) # Fallback | |
| if model_cache["fusion"]: | |
| try: | |
| # XGBoost might warn about feature names, but it won't crash | |
| raw_fusion = model_cache["fusion"].predict_proba([features])[0][1] | |
| final_prob = float(raw_fusion) | |
| except Exception as e: | |
| print(f"Fusion Pred Error: {e}") | |
| # D. Decision Logic | |
| # Now 'np' is guaranteed to be defined | |
| pct = float(np.clip(final_prob, 0.0, 1.0) * 100) | |
| if pct > 85: | |
| decision, action, is_bot = "BOT", "BLOCK", True | |
| elif pct > 50: | |
| decision, action, is_bot = "SUSPICIOUS", "CAPTCHA", True | |
| else: | |
| decision, action, is_bot = "HUMAN", "ALLOW", False | |
| response = { | |
| "success": True, | |
| "request_id": req_id, | |
| "is_bot": is_bot, | |
| "action": action, | |
| "decision": decision, | |
| "confidence": round(pct, 2), | |
| "forensics": { | |
| "botd": round(botd, 2), | |
| "mouse": round(safe_mouse, 2), | |
| "net": round(net_score, 2) | |
| }, | |
| "signals": { | |
| "mouse_available": mouse_score is not None, | |
| "net_available": net_score > 0 | |
| }, | |
| "internal_logs": load_logs | |
| } | |
| # Log to file (Non-blocking) | |
| log_prediction(req_id, data, response) | |
| return jsonify(response) | |
| except Exception as e: | |
| # Print actual error to server logs for debugging | |
| print(f"RUNTIME ERROR: {e}") | |
| return jsonify({"success": False, "error": f"Runtime Error: {str(e)}"}) | |
| def feedback(): | |
| fb = request.json | |
| log_feedback(fb) | |
| return jsonify({"success": True}) | |
| # ------------------ BACKGROUND TASKS ------------------ | |
| def start_auto_retrain(): | |
| try: | |
| if os.path.exists("auto_retrain.py"): | |
| from auto_retrain import retrain_loop | |
| t = threading.Thread(target=retrain_loop, daemon=True) | |
| t.start() | |
| print("🔄 Auto-retrain thread started.") | |
| else: | |
| print("⚠️ auto_retrain.py not found. Skipping background training.") | |
| except Exception as e: | |
| print(f"⚠️ Failed to start retrain thread: {e}") | |
| # ------------------ ENTRY ------------------ | |
| def view_logs(): | |
| # ⚠️ SECURITY WARNING: In a real app, protect this with a password! | |
| try: | |
| if os.path.exists(LOG_FILE_PATH): | |
| with open(LOG_FILE_PATH, "r") as f: | |
| content = f.read() | |
| # Wrap in <pre> so it looks like code in the browser | |
| return f"<h3>Prediction Logs</h3><pre>{content}</pre>" | |
| else: | |
| return "<h3>Log file is empty (No requests yet).</h3>" | |
| except Exception as e: | |
| return f"Error reading logs: {e}" | |
| if __name__ == "__main__": | |
| start_auto_retrain() | |
| app.run(host="0.0.0.0", port=7860) |