| """ |
| backend/main.py |
| =============== |
| FastAPI server exposing live-chat insights from Redis. |
| |
| Start with: |
| uvicorn backend.main:app --reload --port 8000 |
| |
| Endpoints |
| --------- |
| GET /get_messages Last 50 raw messages (full payload) |
| GET /sentiment_trend Last 200 messages β time + sentiment + confidence |
| GET /sentiment_summary Aggregate counts + avg confidence (last 200) |
| GET /topic_stats Per-topic counts (last 100 messages) |
| GET /live_stats Combined snapshot for dashboard widgets |
| GET /health Redis connectivity check |
| """ |
|
|
| import json |
|
|
| import redis |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| from backend.config import REDIS_HOST, REDIS_PORT, REDIS_DB |
|
|
| |
| app = FastAPI( |
| title="YouTube Live Chat Insights API", |
| description="Real-time Hinglish sentiment + topic analysis for live streams", |
| version="2.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| r = redis.Redis( |
| host=REDIS_HOST, |
| port=REDIS_PORT, |
| db=REDIS_DB, |
| decode_responses=True, |
| socket_connect_timeout=5, |
| ) |
|
|
| VALID_TOPICS = {"Appreciation", "Question", "Promo", "Spam", "General", "MCQ Answer"} |
| VALID_SENTIMENT = {"Positive", "Neutral", "Negative"} |
|
|
|
|
| def _get_messages(n: int) -> list[dict]: |
| """Fetch last n messages from Redis, safely parse each.""" |
| raws = r.lrange("chat_messages", -n, -1) |
| out = [] |
| for raw in raws: |
| try: |
| out.append(json.loads(raw)) |
| except (json.JSONDecodeError, TypeError): |
| pass |
| return out |
|
|
|
|
| def _redis_check(): |
| try: |
| r.ping() |
| except redis.RedisError as exc: |
| raise HTTPException(status_code=503, detail=f"Redis unavailable: {exc}") |
|
|
|
|
| |
|
|
| @app.get("/health") |
| def health(): |
| """Check Redis connectivity.""" |
| _redis_check() |
| return {"status": "ok", "redis": f"{REDIS_HOST}:{REDIS_PORT}"} |
|
|
|
|
| @app.get("/get_messages") |
| def get_messages(limit: int = 50): |
| """ |
| Return the last `limit` chat messages with full payload. |
| Each item: author, text, sentiment, confidence, topic, topic_conf, time. |
| """ |
| _redis_check() |
| return _get_messages(min(limit, 200)) |
|
|
|
|
| @app.get("/sentiment_trend") |
| def sentiment_trend(): |
| """ |
| Time-series data for the last 200 messages. |
| Useful for plotting sentiment over time in a frontend chart. |
| Returns: [{time, sentiment, confidence}, ...] |
| """ |
| _redis_check() |
| msgs = _get_messages(200) |
| return [ |
| { |
| "time": m.get("time"), |
| "sentiment": m.get("sentiment"), |
| "confidence": m.get("confidence"), |
| } |
| for m in msgs |
| ] |
|
|
|
|
| @app.get("/sentiment_summary") |
| def sentiment_summary(): |
| """ |
| Aggregate sentiment counts and average confidence for the last 200 messages. |
| Returns: {counts: {Positive, Neutral, Negative}, avg_confidence, total_messages} |
| """ |
| _redis_check() |
| msgs = _get_messages(200) |
| counts = {s: 0 for s in VALID_SENTIMENT} |
| total_conf = 0.0 |
|
|
| for m in msgs: |
| s = m.get("sentiment", "Neutral") |
| if s in counts: |
| counts[s] += 1 |
| try: |
| total_conf += float(m.get("confidence", 0)) |
| except (TypeError, ValueError): |
| pass |
|
|
| n = len(msgs) |
| return { |
| "counts": counts, |
| "avg_confidence": round(total_conf / n, 3) if n else 0.0, |
| "total_messages": n, |
| } |
|
|
|
|
| @app.get("/topic_stats") |
| def topic_stats(): |
| """ |
| Per-topic message counts for the last 100 messages. |
| Returns: {Appreciation: int, Question: int, Promo: int, Spam: int, General: int} |
| """ |
| _redis_check() |
| msgs = _get_messages(100) |
| counts = {t: 0 for t in VALID_TOPICS} |
|
|
| for m in msgs: |
| label = m.get("topic", "General") |
| if label not in VALID_TOPICS: |
| label = "General" |
| counts[label] += 1 |
|
|
| return counts |
|
|
|
|
| @app.get("/live_stats") |
| def live_stats(): |
| """ |
| Combined snapshot β sentiment summary + topic stats in one request. |
| Use this for dashboard widgets to avoid two round-trips. |
| """ |
| _redis_check() |
| msgs = _get_messages(200) |
|
|
| |
| s_counts = {s: 0 for s in VALID_SENTIMENT} |
| total_conf = 0.0 |
| for m in msgs: |
| s = m.get("sentiment", "Neutral") |
| if s in s_counts: |
| s_counts[s] += 1 |
| try: |
| total_conf += float(m.get("confidence", 0)) |
| except (TypeError, ValueError): |
| pass |
| n = len(msgs) |
|
|
| |
| t_counts = {t: 0 for t in VALID_TOPICS} |
| for m in msgs[-100:]: |
| label = m.get("topic", "General") |
| if label not in VALID_TOPICS: |
| label = "General" |
| t_counts[label] += 1 |
|
|
| |
| recent = msgs[-10:] |
|
|
| return { |
| "sentiment": { |
| "counts": s_counts, |
| "avg_confidence": round(total_conf / n, 3) if n else 0.0, |
| "total_messages": n, |
| }, |
| "topics": t_counts, |
| "recent": recent, |
| } |