LivePulse / shared.py
DivYonko
Add API key input field in sidebar - users can enter their own YouTube API key
6285ada
# shared.py
# Pure infrastructure, helpers, and analytics functions.
# No Streamlit UI rendering β€” safe to import from any page without
# triggering widget re-execution.
from __future__ import annotations
import json
import logging
import os
import re
import sqlite3
import threading
from collections import defaultdict
from datetime import datetime
import pandas as pd
import streamlit as st
# ── ML imports ────────────────────────────────────────────────────────────────
from ml.sentiment_model import predict_sentiment
from ml.topic_model import predict_topic, VALID_TOPICS
from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES
# ── SQLite store ──────────────────────────────────────────────────────────────
DB_PATH = "/tmp/livepulse.db"
MAX_STORE_MESSAGES = 100_000
_DB_LOCK = threading.Lock()
_META: dict[str, str] = {}
_SCRAPER_THREADS: dict[str, threading.Thread] = {}
_SCRAPER_STOP: dict[str, threading.Event] = {}
def _get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)")
conn.commit()
return conn
_db_conn = _get_db()
def store_lrange(key: str, start: int, end: int) -> list[str]:
with _DB_LOCK:
rows = _db_conn.execute(
"SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,)
).fetchall()
values = [r[0] for r in rows]
n = len(values)
if n == 0:
return []
if start < 0:
start = max(n + start, 0)
if end < 0:
end = n + end
end = min(end, n - 1)
if start > end:
return []
return values[start: end + 1]
def store_llen(key: str) -> int:
with _DB_LOCK:
row = _db_conn.execute(
"SELECT COUNT(*) FROM messages WHERE key=?", (key,)
).fetchone()
return row[0] if row else 0
def store_delete(key: str) -> None:
with _DB_LOCK:
_db_conn.execute("DELETE FROM messages WHERE key=?", (key,))
_db_conn.commit()
def store_rpush(key: str, value: str) -> None:
with _DB_LOCK:
_db_conn.execute(
"INSERT INTO messages (key, value) VALUES (?, ?)", (key, value)
)
_db_conn.execute("""
DELETE FROM messages WHERE key=? AND id NOT IN (
SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ?
)
""", (key, key, MAX_STORE_MESSAGES))
_db_conn.commit()
# ── Config ────────────────────────────────────────────────────────────────────
VIDEO_ID = os.getenv("VIDEO_ID", "")
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
force=True,
)
logger = logging.getLogger("app.scraper")
# ── Constants ─────────────────────────────────────────────────────────────────
MAX_STREAMS = 5
STREAM_COLORS = ["#7c3aed", "#10b981", "#f59e0b", "#3b82f6", "#ec4899"]
STREAM_NAMES = ["A", "B", "C", "D", "E"]
TOPIC_LABELS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"]
TOPIC_COLOR = {
"Appreciation": "#f59e0b", "Question": "#3b82f6",
"Request/Feedback": "#8b5cf6",
"Promo": "#ec4899", "Spam": "#ef4444", "General": "#6b7280",
"MCQ Answer": "#10b981",
}
SENT_COLORS = {"Positive": "#22c55e", "Neutral": "#eab308", "Negative": "#ef4444"}
# ── Scraper helpers ───────────────────────────────────────────────────────────
def _safe_sentiment(text: str):
try:
return predict_sentiment(text)
except Exception as exc:
logger.error("predict_sentiment failed: %s", exc)
return "Neutral", 0.50
def _safe_topic(text: str):
try:
topic, conf = predict_topic(text)
if topic not in VALID_TOPICS:
return "General", 0.50
return topic, conf
except Exception as exc:
logger.error("predict_topic failed: %s", exc)
return "General", 0.50
def _safe_action_type(text: str):
try:
action_type, conf = predict_action_type(text)
if action_type not in VALID_ACTION_TYPES:
return "N/A", 0.50
return action_type, conf
except Exception as exc:
logger.error("predict_action_type failed: %s", exc)
return "N/A", 0.50
def _get_live_chat_id(video_id: str, api_key: str) -> str | None:
import urllib.request
import urllib.parse
import urllib.error
url = (
"https://www.googleapis.com/youtube/v3/videos"
f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}"
)
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read())
items = data.get("items", [])
if not items:
logger.error("No video found for id=%s", video_id)
return None
live_details = items[0].get("liveStreamingDetails", {})
chat_id = live_details.get("activeLiveChatId")
if not chat_id:
logger.error("No activeLiveChatId for video=%s", video_id)
return chat_id
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")[:500]
logger.error("HTTP %d from YouTube API for video=%s: %s", exc.code, video_id, body)
return None
except Exception as exc:
logger.error("Failed to get liveChatId: %s", exc)
return None
def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None):
import urllib.request
import urllib.parse
params = {
"part": "snippet,authorDetails",
"liveChatId": live_chat_id,
"key": api_key,
"maxResults": "200",
}
if page_token:
params["pageToken"] = page_token
url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params)
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read())
messages = data.get("items", [])
next_token = data.get("nextPageToken")
poll_interval = data.get("pollingIntervalMillis", 5000)
return messages, next_token, poll_interval
except Exception as exc:
logger.error("Failed to fetch chat messages: %s", exc)
return [], None, 5000
def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event, min_poll_s: float = 10.0, api_key: str = "") -> None:
if not api_key:
api_key = os.getenv("YOUTUBE_API_KEY", "")
if not api_key:
msg = "No API key provided. Enter your YouTube Data API v3 key in the sidebar."
logger.error(msg)
_META["scraper_error"] = msg
return
_META.pop("scraper_error", None)
live_chat_id = _get_live_chat_id(video_id, api_key)
if not live_chat_id:
msg = f"No active live chat found for video '{video_id}'. Make sure the stream is currently LIVE."
logger.error(msg)
_META["scraper_error"] = msg
return
page_token = None
seen_ids: set = set()
is_first_page = True
while not stop_event.is_set():
messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token)
new_msgs = []
for item in messages:
if stop_event.is_set():
break
msg_id = item.get("id", "")
if msg_id in seen_ids:
continue
seen_ids.add(msg_id)
snippet = item.get("snippet", {})
if snippet.get("type") != "textMessageEvent":
continue
text = snippet.get("displayMessage", "").strip()
import emoji as _emoji
text = _emoji.emojize(text, language="alias")
author = item.get("authorDetails", {}).get("displayName", "Unknown")
if not text:
continue
new_msgs.append((msg_id, text, author))
if is_first_page and new_msgs:
for _, text, author in new_msgs:
message_data = {
"author": author, "text": text,
"sentiment": "Neutral", "confidence": 0.5,
"topic": "General", "topic_conf": 0.5,
"action_type": "N/A", "action_type_conf": 0.5,
"time": datetime.now().isoformat(),
}
store_rpush(redis_key, json.dumps(message_data))
is_first_page = False
else:
for _, text, author in new_msgs:
if stop_event.is_set():
break
try:
sentiment, s_conf = _safe_sentiment(text)
topic, t_conf = _safe_topic(text)
# Only classify action type for Question/Request topics
if topic in ("Question", "Request/Feedback"):
action_type, at_conf = _safe_action_type(text)
else:
action_type, at_conf = "N/A", 0.50
except Exception as exc:
logger.error("ML inference failed: %s", exc)
sentiment, s_conf = "Neutral", 0.5
topic, t_conf = "General", 0.5
action_type, at_conf = "N/A", 0.5
message_data = {
"author": author, "text": text,
"sentiment": sentiment, "confidence": round(s_conf, 3),
"topic": topic, "topic_conf": round(t_conf, 3),
"action_type": action_type, "action_type_conf": round(at_conf, 3),
"time": datetime.now().isoformat(),
}
store_rpush(redis_key, json.dumps(message_data))
if len(seen_ids) > 5000:
seen_ids = set(list(seen_ids)[-2000:])
# Respect YouTube's requested polling interval, but never faster than min_poll_s
wait_s = max(poll_ms / 1000, min_poll_s)
stop_event.wait(timeout=wait_s)
def start_scraper(slot_idx: int, video_id: str, redis_key: str, min_poll_s: float = 10.0, api_key: str = "") -> None:
key = str(slot_idx)
stop_scraper(slot_idx)
stop_event = threading.Event()
t = threading.Thread(
target=_scraper_thread_fn,
args=(video_id, redis_key, stop_event, min_poll_s, api_key),
daemon=True,
name=f"scraper-{slot_idx}",
)
_SCRAPER_STOP[key] = stop_event
_SCRAPER_THREADS[key] = t
t.start()
def stop_scraper(slot_idx: int) -> None:
key = str(slot_idx)
ev = _SCRAPER_STOP.get(key)
if ev:
ev.set()
def is_scraper_running(slot_idx: int) -> bool:
key = str(slot_idx)
t = _SCRAPER_THREADS.get(key)
return t is not None and t.is_alive()
# ── UI helpers ────────────────────────────────────────────────────────────────
def extract_video_id(url_or_id: str) -> str:
url_or_id = url_or_id.strip()
match = re.search(r"(?:v=|/live/|youtu\.be/)([A-Za-z0-9_-]{11})", url_or_id)
if match:
return match.group(1)
if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id):
return url_or_id
return url_or_id
def fetch_video_title(video_id: str) -> str | None:
"""Try oembed first (works for non-live), then YouTube Data API v3 (works for live)."""
import urllib.request
import urllib.parse
try:
url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json"
with urllib.request.urlopen(url, timeout=5) as resp:
title = json.loads(resp.read()).get("title")
if title:
return title
except Exception:
pass
try:
api_key = os.getenv("YOUTUBE_API_KEY", "")
if api_key:
url = (
"https://www.googleapis.com/youtube/v3/videos"
f"?part=snippet&id={urllib.parse.quote(video_id)}&key={api_key}"
)
with urllib.request.urlopen(url, timeout=5) as resp:
data = json.loads(resp.read())
items = data.get("items", [])
if items:
return items[0]["snippet"]["title"]
except Exception:
pass
return None
def clean_topic(val) -> str:
if pd.isna(val) or str(val).strip() == "" or str(val).strip().lower() == "nan":
return "General"
return str(val).strip()
def clean_sentiment(val) -> str:
if str(val).strip() in ("Positive", "Negative", "Neutral"):
return str(val).strip()
return "Neutral"
def plotly_layout(height: int = 280) -> dict:
return dict(
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)",
height=height,
margin=dict(l=10, r=10, t=10, b=10),
font=dict(family="Space Grotesk"),
xaxis=dict(showgrid=False, zeroline=False, showline=False,
tickfont=dict(size=11), title=None),
yaxis=dict(showgrid=True, gridcolor="rgba(128,128,128,0.12)",
zeroline=False, showline=False, tickfont=dict(size=11), title=None),
showlegend=False,
hoverlabel=dict(font_family="Space Grotesk", font_size=12),
)
def csv_download(df_export, label: str, filename: str) -> None:
csv = df_export.to_csv(index=False).encode("utf-8")
st.download_button(label=f"\u2b07 {label}", data=csv,
file_name=filename, mime="text/csv", key=filename)
def load_stream_data(redis_key: str, limit: int | None = None) -> list[dict]:
if limit:
raws = store_lrange(redis_key, -limit, -1)
else:
raws = store_lrange(redis_key, 0, -1)
data = []
for raw in raws:
try:
data.append(json.loads(raw))
except Exception:
pass
return data
# ── Analytics (cached) ────────────────────────────────────────────────────────
@st.cache_data(ttl=10, show_spinner=False)
def compute_velocity(df_all_json: str, window: int = 20) -> dict:
import json as _json
sentiments = [m.get("sentiment", "Neutral") for m in _json.loads(df_all_json)]
n = len(sentiments)
if n < window * 2:
return {"direction": "\u2192", "delta": 0.0, "label": "Stable", "color": "#eab308"}
recent = sentiments[-window:]
prev = sentiments[-window*2:-window]
r_pos = sum(1 for s in recent if s == "Positive") / window
p_pos = sum(1 for s in prev if s == "Positive") / window
delta = r_pos - p_pos
if delta > 0.08:
return {"direction": "\u2191", "delta": delta, "label": "Rising", "color": "#22c55e"}
elif delta < -0.08:
return {"direction": "\u2193", "delta": delta, "label": "Falling", "color": "#ef4444"}
return {"direction": "\u2192", "delta": delta, "label": "Stable", "color": "#eab308"}
@st.cache_data(ttl=10, show_spinner=False)
def build_heatmap_data(df_all_json: str, bucket_minutes: int = 1) -> pd.DataFrame:
import json as _json
records = _json.loads(df_all_json)
if not records:
return pd.DataFrame()
df_t = pd.DataFrame(records)
if "time" not in df_t.columns:
return pd.DataFrame()
df_t["time"] = pd.to_datetime(df_t["time"], errors="coerce")
df_t = df_t.dropna(subset=["time"])
if df_t.empty:
return pd.DataFrame()
df_t["bucket"] = df_t["time"].dt.floor(f"{bucket_minutes}min")
grouped = df_t.groupby(["bucket", "sentiment"]).size().unstack(fill_value=0)
for col in ["Positive", "Neutral", "Negative"]:
if col not in grouped.columns:
grouped[col] = 0
grouped = grouped.reset_index()
grouped.columns.name = None
return grouped[["bucket", "Positive", "Neutral", "Negative"]]
def check_alert(df_all: pd.DataFrame, threshold: float = 0.4, window: int = 15) -> dict | None:
if len(df_all) < window:
return None
recent = df_all.iloc[-window:]
neg_ratio = (recent["sentiment"] == "Negative").mean()
if neg_ratio >= threshold:
return {
"neg_ratio": neg_ratio,
"count": int((recent["sentiment"] == "Negative").sum()),
"window": window,
}
return None
@st.cache_data(ttl=10, show_spinner=False)
def compute_engagement(all_data_json: str, window: int = 50) -> dict:
import json as _j
msgs = _j.loads(all_data_json)
if not msgs:
return {"score": 0, "rate": 0.0, "pos_ratio": 0.0, "q_density": 0.0, "grade": "\u2014"}
recent = msgs[-window:]
n = len(recent)
rate = 0.0
try:
t0 = datetime.fromisoformat(recent[0]["time"])
t1 = datetime.fromisoformat(recent[-1]["time"])
elapsed = max((t1 - t0).total_seconds() / 60, 0.1)
rate = round(n / elapsed, 1)
except Exception:
rate = float(n)
pos_ratio = sum(1 for m in recent if m.get("sentiment") == "Positive") / max(n, 1)
q_density = sum(1 for m in recent if m.get("topic") == "Question") / max(n, 1)
rate_norm = min(rate / 60, 1.0)
score = round((rate_norm * 0.4 + pos_ratio * 0.4 + q_density * 0.2) * 100)
if score >= 70: grade = "\U0001f525 High"
elif score >= 40: grade = "\u26a1 Medium"
else: grade = "\U0001f4a4 Low"
return {"score": score, "rate": rate, "pos_ratio": pos_ratio, "q_density": q_density, "grade": grade}
@st.cache_data(ttl=10, show_spinner=False)
def compute_top_contributors(all_data_json: str, top_n: int = 10) -> list[dict]:
import json as _j
msgs = _j.loads(all_data_json)
if not msgs:
return []
TOPICS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"]
author_data: dict[str, dict] = {}
for m in msgs:
a = m.get("author", "Unknown")
if a not in author_data:
author_data[a] = {"count": 0, "Positive": 0, "Neutral": 0, "Negative": 0,
**{t: 0 for t in TOPICS}}
author_data[a]["count"] += 1
s = m.get("sentiment", "Neutral")
if s in ("Positive", "Neutral", "Negative"):
author_data[a][s] += 1
t = m.get("topic", "General")
if t not in TOPICS:
t = "General"
author_data[a][t] += 1
sorted_authors = sorted(author_data.items(), key=lambda x: x[1]["count"], reverse=True)[:top_n]
result = []
for author, d in sorted_authors:
total = max(d["count"], 1)
result.append({
"author": author, "count": d["count"],
"pos_pct": round(d["Positive"] / total * 100),
"neu_pct": round(d["Neutral"] / total * 100),
"neg_pct": round(d["Negative"] / total * 100),
"t_appr": round(d["Appreciation"] / total * 100),
"t_ques": round(d["Question"] / total * 100),
"t_rf": round(d["Request/Feedback"] / total * 100),
"t_promo": round(d["Promo"] / total * 100),
"t_spam": round(d["Spam"] / total * 100),
"t_gen": round(d["General"] / total * 100),
"t_mcq": round(d["MCQ Answer"] / total * 100),
})
return result
@st.cache_data(ttl=10, show_spinner=False)
def compute_word_freq(all_data_json: str, sentiment_filter: str = "All",
topic_filter: str = "All", top_n: int = 60) -> list[tuple[str, int]]:
import json as _j
from collections import Counter
STOPWORDS = {
"the","a","an","is","it","in","on","at","to","of","and","or","but","for",
"with","this","that","are","was","be","as","by","from","have","has","had",
"not","no","so","if","do","did","will","can","just","i","you","he","she",
"we","they","my","your","his","her","our","their","me","him","us","them",
"what","how","why","when","where","who","which","there","here","been",
"would","could","should","may","might","shall","than","then","now","also",
"more","very","too","up","out","about","into","over","after","before",
"yaar","bhi","hai","hain","ho","kar","ke","ki","ka","ko","se","ne","ye",
"vo","woh","aur","nahi","nhi","toh","koi","kuch","ab","ek","hi",
}
msgs = _j.loads(all_data_json)
words: list[str] = []
for m in msgs:
if sentiment_filter != "All" and m.get("sentiment") != sentiment_filter:
continue
if topic_filter != "All" and m.get("topic") != topic_filter:
continue
text = re.sub(r"[^\w\s]", " ", m.get("text", "").lower())
for w in text.split():
if len(w) > 2 and w not in STOPWORDS and not w.isdigit():
words.append(w)
return Counter(words).most_common(top_n)
def check_spam_alert(df_all: pd.DataFrame, threshold: float = 0.3, window: int = 20) -> dict | None:
if "topic" not in df_all.columns or len(df_all) < window:
return None
recent = df_all.iloc[-window:]
spam_ratio = (recent["topic"] == "Spam").mean()
if spam_ratio >= threshold:
return {
"spam_ratio": spam_ratio,
"count": int((recent["topic"] == "Spam").sum()),
"window": window,
}
return None
@st.cache_data(ttl=10, show_spinner=False)
def detect_repeat_spammers(all_data_json: str, window_sec: int = 15, min_repeats: int = 2) -> list[dict]:
import json as _j
msgs = _j.loads(all_data_json)
if not msgs:
return []
def _normalize(t: str) -> str:
return re.sub(r"[^\w]", "", t.lower().strip())
bursts: dict[tuple, dict] = {}
for m in msgs:
author = m.get("author", "Unknown")
text = m.get("text", "").strip()
if not text:
continue
norm = _normalize(text)
if len(norm) < 4:
continue
ts_str = m.get("time", "")
try:
ts = datetime.fromisoformat(ts_str)
except Exception:
continue
key = (author, norm)
if key not in bursts:
bursts[key] = {
"author": author, "text": text,
"topic": m.get("topic", "General"),
"sentiment": m.get("sentiment", "Neutral"),
"timestamps": [],
}
bursts[key]["timestamps"].append(ts)
results = []
for key, burst in bursts.items():
times = sorted(burst["timestamps"])
max_in_window = 1
for i in range(len(times)):
count_in_window = sum(
1 for t in times[i:]
if (t - times[i]).total_seconds() <= window_sec
)
max_in_window = max(max_in_window, count_in_window)
if max_in_window >= min_repeats:
results.append({
"author": burst["author"],
"text": burst["text"],
"topic": burst["topic"],
"sentiment": burst["sentiment"],
"count": len(times),
"max_burst": max_in_window,
"first_seen": times[0].strftime("%H:%M:%S"),
"last_seen": times[-1].strftime("%H:%M:%S"),
})
return sorted(results, key=lambda x: x["max_burst"], reverse=True)