| """ | |
| Async worker for processing analysis jobs via Redis queue. | |
| Handles heavy inference tasks that are offloaded from the main API. | |
| Run: python -m backend.app.workers.analysis_worker | |
| """ | |
| import asyncio | |
| import json | |
| import redis | |
| from backend.app.core.config import settings | |
| from backend.app.core.logging import setup_logging, get_logger | |
| from backend.app.services.hf_service import detect_ai_text, get_embeddings, detect_harm | |
| from backend.app.services.groq_service import compute_perplexity | |
| from backend.app.services.stylometry import compute_stylometry_score | |
| from backend.app.services.ensemble import compute_ensemble | |
| from backend.app.services.vector_db import compute_cluster_score, upsert_embedding | |
| setup_logging(settings.LOG_LEVEL) | |
| logger = get_logger(__name__) | |
| QUEUE_NAME = "analysis_jobs" | |
| def run_worker(): | |
| """Blocking worker loop that processes analysis jobs from Redis queue.""" | |
| r = redis.from_url(settings.REDIS_URL, decode_responses=True) | |
| logger.info("Worker started, listening on queue", queue=QUEUE_NAME) | |
| while True: | |
| try: | |
| _, raw = r.brpop(QUEUE_NAME, timeout=30) | |
| if raw is None: | |
| continue | |
| job = json.loads(raw) | |
| text = job.get("text", "") | |
| job_id = job.get("id", "unknown") | |
| logger.info("Processing job", job_id=job_id) | |
| result = asyncio.run(_process_job(text)) | |
| r.setex(f"result:{job_id}", 3600, json.dumps(result)) | |
| logger.info("Job completed", job_id=job_id) | |
| except Exception as e: | |
| logger.error("Worker error", error=str(e)) | |
| async def _process_job(text: str) -> dict: | |
| """Process a single analysis job.""" | |
| try: | |
| p_ai = await detect_ai_text(text) | |
| except Exception: | |
| p_ai = None | |
| s_perp = None | |
| if p_ai is not None and p_ai > settings.PERPLEXITY_THRESHOLD: | |
| s_perp = await compute_perplexity(text) | |
| s_embed_cluster = None | |
| try: | |
| embeddings = await get_embeddings(text) | |
| s_embed_cluster = await compute_cluster_score(embeddings) | |
| await upsert_embedding(f"worker_{hash(text)}", embeddings) | |
| except Exception: | |
| pass | |
| p_ext = await detect_harm(text) | |
| s_styl = compute_stylometry_score(text) | |
| ensemble_result = compute_ensemble( | |
| p_ai=p_ai, s_perp=s_perp, s_embed_cluster=s_embed_cluster, | |
| p_ext=p_ext, s_styl=s_styl, | |
| ) | |
| return { | |
| "p_ai": p_ai, | |
| "s_perp": s_perp, | |
| "s_embed_cluster": s_embed_cluster, | |
| "p_ext": p_ext, | |
| "s_styl": s_styl, | |
| "threat_score": ensemble_result["threat_score"], | |
| "explainability": ensemble_result["explainability"], | |
| } | |
| if __name__ == "__main__": | |
| run_worker() | |