Spaces:
Sleeping
Sleeping
| """ | |
| app.py β Children's Learning Router Service | |
| Runs on Hugging Face Spaces (CPU-only Docker) with uvicorn + FastAPI (ASGI native). | |
| Validates serv_code header, uses Qwen2.5-3B-Instruct to decide routing, | |
| then asynchronously forwards the full payload to the appropriate downstream URL. | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import logging | |
| from contextlib import asynccontextmanager | |
| import httpx | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Logging | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Environment / Secrets | |
| # (set in HF Spaces β Settings β Repository secrets) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| SERV_CODE = os.environ.get("SERV_CODE", "") | |
| CF_API_TOKEN = os.environ.get("CF_API_TOKEN", "") | |
| CF_ACCOUNT_ID = os.environ.get("CF_ACCOUNT_ID", "") | |
| BLOCK_URL = os.environ.get("BLOCK_URL", "https://blockchakalaka.onrender.com") | |
| CHITCHAT_URL = os.environ.get("CHITCHAT_URL", "https://chitchatchakalaka.onrender.com") | |
| QUESTION_URL = os.environ.get("QUESTION_URL", "https://questionchakalaka.onrender.com") | |
| CURRICULUM_URL = os.environ.get("CURRICULUM_URL", "https://currichakalaka.onrender.com") | |
| DECISION_URL_MAP = { | |
| "Block": BLOCK_URL, | |
| "Chitchat": CHITCHAT_URL, | |
| "Question": QUESTION_URL, | |
| "Curriculum": CURRICULUM_URL, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Model globals | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODEL_NAME = "Qwen/Qwen2.5-3B-Instruct" | |
| tokenizer = None | |
| model = None | |
| def load_model() -> None: | |
| """Load Qwen2.5-3B-Instruct once at startup.""" | |
| global tokenizer, model | |
| logger.info("Loading %s on CPU β¦", MODEL_NAME) | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.float32, # CPU-safe | |
| device_map="cpu", | |
| low_cpu_mem_usage=True, | |
| ) | |
| model.eval() | |
| logger.info("Model ready.") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI lifespan β loads model before first request | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def lifespan(app: FastAPI): | |
| load_model() # runs at startup, before any request is served | |
| yield | |
| # nothing to clean up on shutdown | |
| app = FastAPI(lifespan=lifespan) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # System Prompt | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = """You are a routing agent for a children's educational app (ages 5-12). | |
| Read the student context and output EXACTLY ONE word: | |
| Block | Curriculum | Question | Chitchat | |
| RULES β evaluate in this strict order: | |
| 1. BLOCK | |
| - request_message contains abusive, sexual, violent, hateful, or adult content. | |
| - OR the child has been persistently abusive across multiple turns in chat_history. | |
| β Output: Block | |
| 2. CURRICULUM | |
| - request_message is clearly related to current_learning goals. | |
| β Output: Curriculum | |
| 3. QUESTION | |
| - request_message is educational / knowledge-based but NOT related to current_learning. | |
| (Could be another subject, a past/future lesson, or general knowledge.) | |
| β Output: Question | |
| 4. CHITCHAT | |
| - Everything else: greetings, jokes, feelings, random comments, playful chat. | |
| β Output: Chitchat | |
| CRITICAL: | |
| - Output the single decision word ONLY. No punctuation, no explanation. | |
| - Follow the numbered order strictly. | |
| - Prefer Curriculum over Question when current_learning is involved. | |
| - Prefer Question over Chitchat when the message is educational. | |
| """ | |
| def build_user_content(payload: dict) -> str: | |
| """Serialise the full learning context into a prompt for the model.""" | |
| lp = payload.get("learning_path", {}) | |
| query = payload.get("query", {}) | |
| current_learning = lp.get("assessment_stages", {}).get("current_learning", []) | |
| return f"""=== STUDENT CONTEXT === | |
| Board: {lp.get('board', 'N/A')} | |
| Class: {lp.get('class', 'N/A')} | |
| Subject: {lp.get('subject', 'N/A')} | |
| Student Name: {lp.get('student_name', 'N/A')} | |
| Teacher Persona: {lp.get('teacher_persona', 'N/A')} | |
| === CURRICULUM OBJECTIVES === | |
| {json.dumps(lp.get('curriculum_objectives', []), indent=2)} | |
| === CURRENT LEARNING (active topic) === | |
| {json.dumps(current_learning, indent=2)} | |
| === CHAT HISTORY === | |
| {json.dumps(lp.get('chat_history', []), indent=2)} | |
| === SCRATCHPAD === | |
| {json.dumps(lp.get('scratchpad', []), indent=2)} | |
| === STUDENT'S CURRENT REQUEST === | |
| "{query.get('request_message', '')}" | |
| Output your single decision word:""" | |
| def get_decision(payload: dict) -> str: | |
| """Run Qwen inference and return one of: Block, Curriculum, Question, Chitchat.""" | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": build_user_content(payload)}, | |
| ] | |
| text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = tokenizer([text], return_tensors="pt") | |
| with torch.no_grad(): | |
| output_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=5, | |
| do_sample=False, | |
| pad_token_id=tokenizer.eos_token_id, | |
| ) | |
| new_tokens = output_ids[0][inputs["input_ids"].shape[1]:] | |
| raw = tokenizer.decode(new_tokens, skip_special_tokens=True).strip().lower() | |
| logger.info("Raw model output: %r", raw) | |
| if "block" in raw: return "Block" | |
| if "curriculum" in raw: return "Curriculum" | |
| if "question" in raw: return "Question" | |
| return "Chitchat" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Cloudflare IP blocking helper | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def block_ip_cloudflare(ip: str) -> None: | |
| if not CF_API_TOKEN or not CF_ACCOUNT_ID: | |
| logger.warning("Cloudflare secrets not configured β skipping IP block for %s", ip) | |
| return | |
| url = f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/firewall/access_rules/rules" | |
| cf_headers = { | |
| "Authorization": f"Bearer {CF_API_TOKEN}", | |
| "Content-Type": "application/json", | |
| } | |
| body = { | |
| "mode": "block", | |
| "configuration": {"target": "ip", "value": ip}, | |
| "notes": "Auto-blocked: invalid serv_code", | |
| } | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| resp = await client.post(url, headers=cf_headers, json=body) | |
| logger.info("Cloudflare block %s β HTTP %s", ip, resp.status_code) | |
| except Exception as exc: | |
| logger.error("Cloudflare block failed for %s: %s", ip, exc) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Downstream forwarding helper | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def forward_request(target_url: str, payload: dict, serv_code: str) -> tuple[dict, int]: | |
| """ | |
| POST the full payload to the chosen downstream service. | |
| Retries up to 3 times on ConnectError or TimeoutException with exponential backoff. | |
| Uses split timeouts to handle Render cold-starts gracefully. | |
| """ | |
| fwd_headers = { | |
| "Content-Type": "application/json", | |
| "serv_code": serv_code, | |
| } | |
| # Split timeouts: generous connect window for Render cold-starts, | |
| # generous read window for slow inference on downstream services. | |
| timeout = httpx.Timeout( | |
| connect=60.0, # Render free-tier cold-start can take 30β50 s | |
| read=120.0, # downstream inference may be slow | |
| write=30.0, | |
| pool=10.0, | |
| ) | |
| last_exc: Exception | None = None | |
| for attempt in range(1, 4): # attempts 1, 2, 3 | |
| try: | |
| logger.info( | |
| "Forward attempt %d/3 β %s", attempt, target_url | |
| ) | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| resp = await client.post(target_url, json=payload, headers=fwd_headers) | |
| logger.info( | |
| "Downstream %s β HTTP %s (attempt %d)", target_url, resp.status_code, attempt | |
| ) | |
| try: | |
| return resp.json(), resp.status_code | |
| except Exception as parse_exc: | |
| logger.warning( | |
| "Could not parse JSON from %s (HTTP %s): %s β returning raw text", | |
| target_url, resp.status_code, parse_exc, | |
| ) | |
| return {"raw_response": resp.text}, resp.status_code | |
| except httpx.TimeoutException as exc: | |
| last_exc = exc | |
| logger.warning( | |
| "Attempt %d/3 TIMEOUT for %s | type=%s | detail=%s", | |
| attempt, target_url, type(exc).__name__, exc, | |
| ) | |
| except httpx.ConnectError as exc: | |
| # This is the primary cause of 502s with Render cold-starts: | |
| # the service is sleeping and refuses/resets the connection. | |
| last_exc = exc | |
| logger.warning( | |
| "Attempt %d/3 CONNECT ERROR for %s | type=%s | detail=%s", | |
| attempt, target_url, type(exc).__name__, exc, | |
| ) | |
| except httpx.HTTPStatusError as exc: | |
| # Downstream returned a 4xx/5xx β no point retrying. | |
| logger.error( | |
| "Downstream %s returned HTTP error (attempt %d): status=%s body=%s", | |
| target_url, attempt, exc.response.status_code, exc.response.text, | |
| ) | |
| return {"error": f"Downstream HTTP error: {exc.response.status_code}"}, exc.response.status_code | |
| except Exception as exc: | |
| # Unexpected / non-retryable error β log full traceback and bail. | |
| logger.exception( | |
| "Attempt %d/3 UNEXPECTED ERROR for %s | type=%s | detail=%s", | |
| attempt, target_url, type(exc).__name__, exc, | |
| ) | |
| return {"error": f"Unexpected forwarding error: {exc}"}, 502 | |
| # Exponential backoff before next attempt (2 s, 4 s) | |
| if attempt < 3: | |
| backoff = 2 ** attempt | |
| logger.info("Backing off %ds before retry β¦", backoff) | |
| await asyncio.sleep(backoff) | |
| # All 3 attempts exhausted | |
| logger.error( | |
| "All 3 forward attempts failed for %s | last_error_type=%s | last_error=%s", | |
| target_url, type(last_exc).__name__, last_exc, | |
| ) | |
| return { | |
| "error": f"Downstream unreachable after 3 attempts: {target_url}", | |
| "last_error_type": type(last_exc).__name__, | |
| "last_error": str(last_exc), | |
| }, 502 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Shared helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_caller_ip(req: Request) -> str: | |
| forwarded = req.headers.get("x-forwarded-for", "") | |
| if forwarded: | |
| return forwarded.split(",")[0].strip() | |
| return req.client.host if req.client else "unknown" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Routes | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def health(): | |
| """Public liveness probe β no auth required.""" | |
| return {"status": "ok"} | |
| async def ping(request: Request): | |
| """ | |
| Authenticated liveness probe. | |
| Validates serv_code header. Blocks invalid callers in Cloudflare. | |
| """ | |
| incoming_code = request.headers.get("serv_code", "") | |
| if not incoming_code or incoming_code != SERV_CODE: | |
| caller_ip = get_caller_ip(request) | |
| logger.warning("Ping rejected β invalid serv_code from IP %s", caller_ip) | |
| await block_ip_cloudflare(caller_ip) | |
| return JSONResponse(status_code=401, content={"error": "Unauthorized"}) | |
| return JSONResponse(content={ | |
| "status": "alive", | |
| "service": "children-learning-router", | |
| "model": MODEL_NAME, | |
| }) | |
| async def chat(request: Request): | |
| """ | |
| Main routing endpoint. | |
| 1. Validate serv_code header. | |
| 2. Parse and validate JSON body. | |
| 3. Get routing decision from Qwen. | |
| 4. Async-forward payload to chosen downstream service. | |
| 5. Return downstream response to caller. | |
| """ | |
| # ββ 1. Auth ββββββββββββββββββββββββββββββ | |
| incoming_code = request.headers.get("serv_code", "") | |
| if not incoming_code or incoming_code != SERV_CODE: | |
| caller_ip = get_caller_ip(request) | |
| logger.warning("Invalid serv_code from IP %s", caller_ip) | |
| await block_ip_cloudflare(caller_ip) | |
| return JSONResponse(status_code=401, content={"error": "Unauthorized"}) | |
| # ββ 2. Parse body ββββββββββββββββββββββββ | |
| try: | |
| payload = await request.json() | |
| except Exception: | |
| return JSONResponse(status_code=400, content={"error": "Request body must be valid JSON"}) | |
| if "learning_path" not in payload: | |
| return JSONResponse(status_code=400, content={"error": "Missing required field: learning_path"}) | |
| if "query" not in payload: | |
| return JSONResponse(status_code=400, content={"error": "Missing required field: query"}) | |
| if "request_message" not in payload.get("query", {}): | |
| return JSONResponse(status_code=400, content={"error": "Missing required field: query.request_message"}) | |
| for field in ["board", "class", "subject", "student_name", "teacher_persona"]: | |
| if field not in payload["learning_path"]: | |
| return JSONResponse(status_code=400, content={"error": f"Missing required field: learning_path.{field}"}) | |
| # ββ 3. Decision ββββββββββββββββββββββββββ | |
| try: | |
| decision = get_decision(payload) | |
| except Exception as exc: | |
| logger.exception("Model inference error: %s", exc) | |
| return JSONResponse(status_code=500, content={"error": "Model inference failed"}) | |
| logger.info("Routing decision: %s", decision) | |
| target_url = DECISION_URL_MAP.get(decision, CHITCHAT_URL) | |
| # ββ 4. Forward βββββββββββββββββββββββββββ | |
| response_body, status_code = await forward_request(target_url, payload, incoming_code) | |
| # ββ 5. Return ββββββββββββββββββββββββββββ | |
| return JSONResponse(status_code=status_code, content={ | |
| "decision": decision, | |
| "forwarded": target_url, | |
| "response": response_body, | |
| }) |