Bot-Detection / app.py
Aadhya-R's picture
Update app.py
fd00127 verified
Raw
History Blame Contribute Delete
9.08 kB
from flask import Flask, request, jsonify
import os
import pickle
import uuid
import json
from datetime import datetime
import threading
# 🛑 NO HEAVY IMPORTS AT TOP LEVEL
# We import them inside functions to prevent "Memory Limit" crashes on startup.
app = Flask(__name__)
# Global Cache
model_cache = {
"lucid": None,
"mouse": None,
"fusion": None,
"loaded": False,
"error": None,
"logs": []
}
# --- CONFIGURATION ---
# We write to /tmp because the root folder is Read-Only on HF Spaces
LOG_FILE_PATH = "/tmp/predictions.log"
# ------------------ LOGGING HELPERS ------------------
def log_prediction(req_id, payload, output):
"""Safely logs predictions to a temp file."""
try:
record = {
"request_id": req_id,
"time": datetime.utcnow().isoformat(),
"input": payload,
"output": output
}
with open(LOG_FILE_PATH, "a") as f:
f.write(json.dumps(record) + "\n")
except Exception as e:
print(f"⚠️ LOGGING FAILED (Non-Fatal): {e}")
def log_feedback(feedback):
"""Safely logs user feedback."""
try:
feedback["time"] = datetime.utcnow().isoformat()
with open(LOG_FILE_PATH, 'a') as f:
f.write(json.dumps(feedback) + "\n")
except Exception as e:
print(f"⚠️ FEEDBACK LOGGING FAILED: {e}")
# ------------------ MODEL LOADING ------------------
def load_heavy_brains():
if model_cache["loaded"]:
return model_cache["logs"]
log = []
try:
log.append("⏳ Importing TensorFlow...")
import tensorflow as tf
log.append("✅ TensorFlow Imported")
log.append("⏳ Importing XGBoost...")
import xgboost as xgb
log.append("✅ XGBoost Imported")
# Define Architecture locally
Sequential = tf.keras.models.Sequential
Input = tf.keras.layers.Input
LSTM = tf.keras.layers.LSTM
Dense = tf.keras.layers.Dense
Dropout = tf.keras.layers.Dropout
BatchNormalization = tf.keras.layers.BatchNormalization
LeakyReLU = tf.keras.layers.LeakyReLU
# Load LUCID
if os.path.exists("lucid_cnn.h5"):
model_cache["lucid"] = tf.keras.models.load_model("lucid_cnn.h5")
log.append("✅ LUCID Model Loaded")
else:
log.append("⚠️ lucid_cnn.h5 missing")
# Load MOUSE
if os.path.exists("delbot_rnn.h5"):
mouse_model = Sequential([
Input(shape=(None, 10)),
LSTM(128, return_sequences=True),
BatchNormalization(),
LeakyReLU(alpha=0.1),
Dropout(0.3),
LSTM(64),
LeakyReLU(alpha=0.1),
Dropout(0.1),
Dense(2, activation='softmax')
])
mouse_model.load_weights("delbot_rnn.h5")
model_cache["mouse"] = mouse_model
log.append("✅ Mouse Model Loaded")
else:
log.append("⚠️ delbot_rnn.h5 missing")
# Load FUSION
if os.path.exists("fusion_xgboost.pkl"):
with open("fusion_xgboost.pkl", "rb") as f:
model_cache["fusion"] = pickle.load(f)
log.append("✅ Fusion Model Loaded")
else:
log.append("⚠️ fusion_xgboost.pkl missing")
model_cache["loaded"] = True
model_cache["logs"] = log
return log
except Exception as e:
err = f"❌ CRITICAL LOAD ERROR: {str(e)}"
print(err)
model_cache["error"] = err
return log + [err]
# ------------------ DATA PROCESSING ------------------
def process_mouse_data(trace):
try:
import numpy as np
MAX_STEPS = 60
if not trace or len(trace) < 2:
return None
vectors = []
for i in range(1, len(trace)):
dt = (trace[i]['t'] - trace[i-1]['t']) or 1
dx = trace[i]['x'] - trace[i-1]['x']
dy = trace[i]['y'] - trace[i-1]['y']
angle = np.arctan2(dy, dx)
vectors.append([dx, dy, dt, dx/dt, dy/dt, angle, 0.0, 0.0, 0.0, 0.0])
data = np.array(vectors)
if len(data) > MAX_STEPS:
data = data[:MAX_STEPS]
else:
data = np.vstack([data, np.zeros((MAX_STEPS - len(data), 10))])
return np.expand_dims(data, axis=0)
except:
return None
# ------------------ ROUTES ------------------
@app.route("/")
def home():
return "<h3>Bot Detection Server</h3>Status: 🟢 Running"
@app.route("/detect", methods=["POST"])
def detect():
req_id = str(uuid.uuid4())
# 1. Load Brains (Lazy)
load_logs = load_heavy_brains()
if model_cache["error"]:
return jsonify({"success": False, "error": model_cache["error"]})
try:
# CRITICAL FIX: Import numpy HERE so it exists even if 'lucid' block is skipped
import numpy as np
data = request.json or {}
botd = float(data.get("botd_score", 0.0))
mouse_trace = data.get("mouse_trace", [])
ts = data.get("request_timestamps", [])
mouse_score = None
net_score = 0.0
# A. Mouse Prediction
if model_cache["mouse"]:
inp = process_mouse_data(mouse_trace)
if inp is not None:
raw_mouse = model_cache["mouse"].predict(inp, verbose=0)[0][1]
mouse_score = float(raw_mouse)
# B. Net Prediction
if model_cache["lucid"] and len(ts) > 2:
iat = np.diff(sorted(ts))[:10] / 1000.0
mat = np.zeros((1, 10, 11, 1))
l = min(len(iat), 10)
mat[0, :l, 0, 0] = iat[:l]
raw_net = model_cache["lucid"].predict(mat, verbose=0)[0][0]
net_score = float(raw_net)
# C. Fusion Prediction
safe_mouse = mouse_score if mouse_score is not None else 0.5
features = [botd, safe_mouse, net_score]
final_prob = max(features) # Fallback
if model_cache["fusion"]:
try:
# XGBoost might warn about feature names, but it won't crash
raw_fusion = model_cache["fusion"].predict_proba([features])[0][1]
final_prob = float(raw_fusion)
except Exception as e:
print(f"Fusion Pred Error: {e}")
# D. Decision Logic
# Now 'np' is guaranteed to be defined
pct = float(np.clip(final_prob, 0.0, 1.0) * 100)
if pct > 85:
decision, action, is_bot = "BOT", "BLOCK", True
elif pct > 50:
decision, action, is_bot = "SUSPICIOUS", "CAPTCHA", True
else:
decision, action, is_bot = "HUMAN", "ALLOW", False
response = {
"success": True,
"request_id": req_id,
"is_bot": is_bot,
"action": action,
"decision": decision,
"confidence": round(pct, 2),
"forensics": {
"botd": round(botd, 2),
"mouse": round(safe_mouse, 2),
"net": round(net_score, 2)
},
"signals": {
"mouse_available": mouse_score is not None,
"net_available": net_score > 0
},
"internal_logs": load_logs
}
# Log to file (Non-blocking)
log_prediction(req_id, data, response)
return jsonify(response)
except Exception as e:
# Print actual error to server logs for debugging
print(f"RUNTIME ERROR: {e}")
return jsonify({"success": False, "error": f"Runtime Error: {str(e)}"})
@app.route("/feedback", methods=["POST"])
def feedback():
fb = request.json
log_feedback(fb)
return jsonify({"success": True})
# ------------------ BACKGROUND TASKS ------------------
def start_auto_retrain():
try:
if os.path.exists("auto_retrain.py"):
from auto_retrain import retrain_loop
t = threading.Thread(target=retrain_loop, daemon=True)
t.start()
print("🔄 Auto-retrain thread started.")
else:
print("⚠️ auto_retrain.py not found. Skipping background training.")
except Exception as e:
print(f"⚠️ Failed to start retrain thread: {e}")
# ------------------ ENTRY ------------------
@app.route("/admin/logs")
def view_logs():
# ⚠️ SECURITY WARNING: In a real app, protect this with a password!
try:
if os.path.exists(LOG_FILE_PATH):
with open(LOG_FILE_PATH, "r") as f:
content = f.read()
# Wrap in <pre> so it looks like code in the browser
return f"<h3>Prediction Logs</h3><pre>{content}</pre>"
else:
return "<h3>Log file is empty (No requests yet).</h3>"
except Exception as e:
return f"Error reading logs: {e}"
if __name__ == "__main__":
start_auto_retrain()
app.run(host="0.0.0.0", port=7860)