Spaces:
Sleeping
Sleeping
# Anime Character Sentiment — Unified Realtime (MAL via Jikan) | |
# Streamlit app with: live Jikan fetch, CSV upload+bootstrap, SQLite persistence, | |
# fuzzy/strict character tagging, strict "Best" safeguard, diagnostics, and performance tuning: | |
# - sentiment engine switch (Transformers or VADER) | |
# - adjustable sentiment batch size | |
# - cap rows to tag per refresh | |
# - pause live fetching toggle | |
# - longer caches for characters/titles | |
# Run: streamlit run app.py | |
import os, re, time, random, sqlite3, hashlib, contextlib | |
from typing import List, Dict, Tuple | |
from collections import Counter | |
import requests | |
import pandas as pd | |
import streamlit as st | |
import plotly.express as px | |
import torch | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from rapidfuzz import fuzz, process as rf_process | |
# Speed up PyTorch on CPU | |
torch.set_num_threads(min(os.cpu_count() or 2, 8)) | |
# Optional LoRA adapters | |
try: | |
from peft import PeftModel | |
PEFT_OK = True | |
except Exception: | |
PEFT_OK = False | |
# =========================== | |
# App config | |
# =========================== | |
st.set_page_config(page_title="Anime Character Sentiment — Realtime", layout="wide") | |
st.title("🎌 Anime Character Sentiment — Unified Realtime (MAL via Jikan)") | |
# =========================== | |
# Sidebar controls | |
# =========================== | |
st.sidebar.header("Run Settings") | |
auto_refresh_sec = st.sidebar.slider("Auto-refresh (sec)", 5, 120, 30) | |
days_window = st.sidebar.slider("Show reviews from last N days (0 = all)", 0, 3650, 120) | |
show_raw_only = st.sidebar.checkbox( | |
"Raw reviews only (no model)", value=False, | |
help="Turn off to add sentiment + character tagging." | |
) | |
tag_mode_label = st.sidebar.selectbox( | |
"Character tagging mode", | |
["Fuzzy (max hits)", "Loose (more hits)", "Medium", "Strict"], | |
index=1, | |
help="Fuzzy = partial match; Loose/Medium = substring; Strict = whole word." | |
) | |
tag_mode = tag_mode_label.split(" ")[0].lower() | |
strict_best = st.sidebar.checkbox( | |
"Strict ‘Best’ (Pos > Neg)", value=True, | |
help="If on, ‘Best character’ must have higher positive than negative rate." | |
) | |
# Performance knobs | |
sent_batch = st.sidebar.slider("Sentiment batch size", 16, 256, 96) | |
max_tag_rows = st.sidebar.slider("Max rows to tag per refresh", 50, 2000, 400) | |
pause_fetch = st.sidebar.checkbox("Pause live fetching (Jikan)", value=False, | |
help="Stop network fetch & plan rotation; show current data only.") | |
st.sidebar.header("Model") | |
repo_base = st.sidebar.text_input("HF model repo id", value="cardiffnlp/twitter-roberta-base-sentiment-latest") | |
repo_adapter = st.sidebar.text_input("Optional LoRA adapter repo id (Hub)", value="") | |
engine = st.sidebar.selectbox("Sentiment engine", ["Transformers (accurate)", "VADER (fast)"], index=0) | |
with st.sidebar.expander("Advanced (fetch & rate limits)"): | |
TASKS_PER_RUN = st.slider("API calls per refresh", 2, 12, 6) | |
topN = st.slider("Top-N anime size (per list)", 10, 60, 15) | |
review_pages = st.slider("Review pages per anime", 1, 8, 2) | |
global_pages = st.slider("Global pages per cycle", 2, 10, 3) | |
extra_global_pages = st.slider("Extra backfill pages (one-off)", 0, 20, 4) | |
rate_limit_rps = st.slider("Jikan polite RPS", 1, 4, 3) | |
only_rows_with_character = st.sidebar.checkbox("Show only rows with character", value=False) | |
# CSV uploader (append-on-click) | |
uploads = st.sidebar.file_uploader( | |
"Add CSV reviews (appends to data)", type=["csv"], accept_multiple_files=True, | |
help=("Flexible cols: time/date/created_at/timestamp, anime/anime_title, " | |
"anime_id (optional), score (optional), preview/review/text/comment/body, " | |
"sentiment (optional), character (optional)") | |
) | |
# Small visibility counters (helps diagnose “blank” screens) | |
st.sidebar.markdown("---") | |
if "store" in st.session_state: | |
st.sidebar.metric("Rows in memory", len(st.session_state.store)) | |
# =========================== | |
# Caching: Model & VADER | |
# =========================== | |
def load_model(repo_base: str, repo_adapter: str): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
tok = AutoTokenizer.from_pretrained(repo_base, use_fast=True) | |
base = AutoModelForSequenceClassification.from_pretrained(repo_base) | |
base.eval().to(device) | |
if repo_adapter.strip() and PEFT_OK: | |
try: | |
model = PeftModel.from_pretrained(base, repo_adapter.strip()) | |
return tok, model.eval().to(device), device | |
except Exception as e: | |
st.warning(f"⚠️ Adapter {repo_adapter} failed: {e}. Using base only.") | |
return tok, base, device | |
TOK, MODEL, DEVICE = load_model(repo_base, repo_adapter) | |
def load_vader(): | |
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
return SentimentIntensityAnalyzer() | |
def vader_predict(texts: List[str]) -> List[str]: | |
an = load_vader() | |
outs = [] | |
for t in texts: | |
c = an.polarity_scores(str(t))["compound"] | |
outs.append("positive" if c > 0.05 else "negative" if c < -0.05 else "neutral") | |
return outs | |
def predict_fn(): | |
return (lambda xs: vader_predict(xs)) if engine.startswith("VADER") \ | |
else (lambda xs: model_predict(xs, batch=sent_batch)) | |
# =========================== | |
# Jikan helpers | |
# =========================== | |
JIKAN = "https://api.jikan.moe/v4" | |
def _sleep_by_rps(rps: int): | |
time.sleep(max(0.25, 1.0 / max(1, rps))) | |
def _get(url: str, params: dict | None = None, rps: int = 2, timeout: int = 15) -> Dict: | |
_sleep_by_rps(rps) | |
try: | |
r = requests.get(url, params=params, timeout=timeout) | |
if r.status_code == 200: | |
return r.json() | |
if r.status_code in (429, 500, 502, 503): | |
time.sleep(1.2) | |
r = requests.get(url, params=params, timeout=timeout) | |
if r.status_code == 200: | |
return r.json() | |
except Exception: | |
return {} | |
return {} | |
def get_global_reviews(page: int = 1) -> pd.DataFrame: | |
js = _get(f"{JIKAN}/reviews/anime", params={"page": page}, rps=rate_limit_rps) | |
rows = [] | |
if js and "data" in js: | |
for it in js["data"]: | |
entry = it.get("entry", {}) | |
rows.append({ | |
"rid": it.get("mal_id"), | |
"time": it.get("date"), | |
"anime_id": entry.get("mal_id"), | |
"anime": entry.get("title"), | |
"score": it.get("score"), | |
"preview": it.get("review"), | |
}) | |
return pd.DataFrame(rows) | |
def get_top_anime(n: int = 10) -> List[Dict]: | |
out, page = [], 1 | |
while len(out) < n and page <= 8: | |
js = _get(f"{JIKAN}/top/anime", params={"page": page}, rps=rate_limit_rps) | |
if not js or "data" not in js: | |
break | |
for a in js["data"]: | |
out.append({"mal_id": a["mal_id"], "title": a["title"]}) | |
if len(out) >= n: break | |
page += 1 | |
return out | |
def get_top_airing(n: int = 10) -> List[Dict]: | |
out, page = [], 1 | |
while len(out) < n and page <= 5: | |
js = _get(f"{JIKAN}/seasons/now", params={"page": page}, rps=rate_limit_rps) | |
if not js or "data" not in js: | |
break | |
for a in js["data"]: | |
out.append({"mal_id": a["mal_id"], "title": a["title"]}) | |
if len(out) >= n: break | |
page += 1 | |
return out | |
def get_anime_reviews(anime_id: int, page: int = 1) -> pd.DataFrame: | |
js = _get(f"{JIKAN}/anime/{anime_id}/reviews", params={"page": page}, rps=rate_limit_rps) | |
rows = [] | |
if js and "data" in js: | |
for it in js["data"]: | |
rows.append({ | |
"rid": it.get("mal_id"), | |
"time": it.get("date"), | |
"anime_id": anime_id, | |
"anime": None, | |
"score": it.get("score"), | |
"preview": it.get("review"), | |
}) | |
return pd.DataFrame(rows) | |
# cache 1 day | |
def get_anime_characters(anime_id: int) -> List[str]: | |
js = _get(f"{JIKAN}/anime/{anime_id}/characters", rps=rate_limit_rps) | |
names = [] | |
if js and "data" in js: | |
for ch in js["data"]: | |
nm = ch.get("character", {}).get("name") | |
if nm: | |
names.append(nm) | |
return sorted(set(names), key=lambda s: (-len(s), s)) | |
# cache 1 day | |
def get_anime_title_by_id(anime_id: int) -> str | None: | |
js = _get(f"{JIKAN}/anime/{anime_id}", rps=rate_limit_rps) | |
try: | |
return js["data"]["title"] | |
except Exception: | |
return None | |
def find_anime_id_by_title(title: str) -> int | None: | |
js = _get(f"{JIKAN}/anime", params={"q": title, "limit": 1}, rps=rate_limit_rps) | |
try: | |
return int(js["data"][0]["mal_id"]) | |
except Exception: | |
return None | |
# =========================== | |
# Inference & tagging | |
# =========================== | |
def model_predict(texts: List[str], batch: int = 32, max_len: int = 256) -> List[str]: | |
id2label = getattr(MODEL.config, "id2label", {0: "negative", 1: "neutral", 2: "positive"}) | |
outs = [] | |
MODEL.eval() | |
with torch.no_grad(): | |
for i in range(0, len(texts), batch): | |
enc = TOK(texts[i:i+batch], return_tensors="pt", truncation=True, padding=True, max_length=max_len) | |
enc = {k: v.to(DEVICE) for k, v in enc.items()} | |
logits = MODEL(**enc).logits.detach().cpu().numpy() | |
outs.extend([str(id2label[int(x)]).lower() for x in logits.argmax(-1)]) | |
return outs | |
_word_re_cache: Dict[str, re.Pattern] = {} | |
def _name_pattern(name: str, strict: bool) -> re.Pattern: | |
key = f"{name}|{'strict' if strict else 'loose'}" | |
if key in _word_re_cache: | |
return _word_re_cache[key] | |
pat = re.compile(rf"(?i)(?<![\w]){re.escape(name)}(?![\w])") if strict else re.compile(rf"(?i){re.escape(name)}") | |
_word_re_cache[key] = pat | |
return pat | |
COMMON_BLACKLIST = {"ken", "ai", "jin", "ran", "light", "near", "go"} | |
def expand_aliases(names: List[str], mode: str) -> List[str]: | |
aliases = set() | |
for full in names: | |
aliases.add(full) | |
parts = [p for p in re.split(r"\s+|,", full) if p] | |
if mode in ("medium", "loose", "fuzzy"): | |
minlen = 4 if mode == "medium" else 3 | |
for p in parts: | |
pl = p.strip() | |
if len(pl) >= minlen and pl.lower() not in COMMON_BLACKLIST: | |
aliases.add(pl) | |
return sorted(aliases, key=lambda s: (-len(s), s)) | |
def tag_character(text: str, candidates: List[str], mode: str) -> str: | |
if not text: | |
return "" | |
t = str(text) | |
if mode == "strict": | |
for name in candidates: | |
if _name_pattern(name, True).search(t): | |
return name | |
return "" | |
if mode in ("loose", "medium"): | |
strict_flag = (mode == "medium") | |
for name in candidates: | |
if _name_pattern(name, strict_flag).search(t): | |
return name | |
return "" | |
best = rf_process.extractOne(query=t, choices=candidates, scorer=fuzz.partial_ratio, score_cutoff=65) | |
return best[0] if best else "" | |
# =========================== | |
# Phrase mining | |
# =========================== | |
STOPWORDS = set("""a an the of and to for with in on at by from this that these those is are was were be been but or if as it's its i'm you're they've we've don't can't won't just very really more most less many much even about into than too so such over under again new old same other another not no yes good bad best worst great love hate like dislike awesome terrible awful boring mid slow fast fun cool""".split()) | |
def _clean_tokens(text: str) -> List[str]: | |
toks = re.findall(r"[a-zA-Z][a-zA-Z']+", str(text).lower()) | |
return [t for t in toks if t not in STOPWORDS and len(t) > 2] | |
def _top_phrases(texts: List[str], n: int = 8) -> List[str]: | |
bigram_counter = Counter() | |
for t in texts: | |
toks = _clean_tokens(t) | |
bigram_counter.update([f"{toks[i]} {toks[i+1]}" for i in range(len(toks)-1)]) | |
return [p for p, _ in bigram_counter.most_common(n)] | |
# =========================== | |
# Session store | |
# =========================== | |
if "store" not in st.session_state: | |
st.session_state.store = pd.DataFrame(columns=["rid","time","anime_id","anime","score","preview","sentiment","character"]) # type: ignore | |
if "plan" not in st.session_state: | |
st.session_state.plan = [] | |
st.session_state.plan_i = 0 | |
# =========================== | |
# SQLite persistence | |
# =========================== | |
DB_PATH = os.environ.get("ANIREV_DB", "/tmp/anirev.sqlite") | |
SQL_CREATE = """ | |
CREATE TABLE IF NOT EXISTS reviews ( | |
rid TEXT, | |
time TEXT, | |
anime_id INTEGER, | |
anime TEXT, | |
score REAL, | |
preview TEXT, | |
sentiment TEXT, | |
character TEXT, | |
ukey TEXT NOT NULL, | |
PRIMARY KEY (ukey) | |
); | |
CREATE INDEX IF NOT EXISTS idx_reviews_time ON reviews(time); | |
CREATE INDEX IF NOT EXISTS idx_reviews_anime ON reviews(anime); | |
""" | |
def _mk_ukey(row: dict) -> str: | |
rid = str(row.get("rid") or "").strip() | |
if rid: | |
base = f"rid::{rid}" | |
else: | |
aid = str(row.get("anime_id") or "").strip() | |
pv = str(row.get("preview") or "").strip() | |
base = f"aid::{aid}|pv::{pv}" | |
return hashlib.sha1(base.encode("utf-8", "ignore")).hexdigest() | |
def _conn(path: str): | |
con = sqlite3.connect(path, timeout=30, isolation_level=None) # autocommit | |
try: | |
yield con | |
finally: | |
con.close() | |
def db_init(): | |
with _conn(DB_PATH) as con: | |
cur = con.cursor() | |
for stmt in SQL_CREATE.strip().split(";"): | |
s = stmt.strip() | |
if s: | |
cur.execute(s) | |
def db_upsert_df(df: pd.DataFrame) -> int: | |
if df is None or df.empty: | |
return 0 | |
d = df.copy() | |
required_defaults = { | |
"rid": "", | |
"time": pd.Timestamp.utcnow(), | |
"anime_id": pd.NA, | |
"anime": "", | |
"score": None, | |
"preview": "", | |
"sentiment": "", | |
"character": "", | |
} | |
for col, default in required_defaults.items(): | |
if col not in d.columns: | |
d[col] = default | |
d["time"] = pd.to_datetime(d["time"], errors="coerce", utc=True).fillna(pd.Timestamp.utcnow()).dt.strftime("%Y-%m-%dT%H:%M:%SZ") | |
for col in ["rid","anime","preview","sentiment","character"]: | |
d[col] = d[col].fillna("").astype(str) | |
d["sentiment"] = d["sentiment"].str.strip().str.lower() | |
d["score"] = pd.to_numeric(d["score"], errors="coerce") | |
d["anime_id"] = pd.to_numeric(d["anime_id"], errors="coerce").astype("Int64") | |
d["ukey"] = d.apply(lambda r: _mk_ukey(r.to_dict()), axis=1) | |
cols = ["rid","time","anime_id","anime","score","preview","sentiment","character","ukey"] | |
rows = d.loc[:, cols].itertuples(index=False, name=None) | |
with _conn(DB_PATH) as con: | |
cur = con.cursor() | |
cur.execute("PRAGMA journal_mode=WAL;") | |
cur.executemany(""" | |
INSERT OR IGNORE INTO reviews | |
(rid,time,anime_id,anime,score,preview,sentiment,character,ukey) | |
VALUES (?,?,?,?,?,?,?,?,?) | |
""", list(rows)) | |
cur.execute("SELECT changes();") | |
inserted = cur.fetchone()[0] or 0 | |
return int(inserted) | |
def db_load_recent(days: int | None = None) -> pd.DataFrame: | |
with _conn(DB_PATH) as con: | |
cur = con.cursor() | |
if days and days > 0: | |
cur.execute(f""" | |
SELECT rid,time,anime_id,anime,score,preview,sentiment,character | |
FROM reviews | |
WHERE time >= datetime('now','-{int(days)} days') | |
ORDER BY time DESC | |
""") | |
else: | |
cur.execute(""" | |
SELECT rid,time,anime_id,anime,score,preview,sentiment,character | |
FROM reviews | |
ORDER BY time DESC | |
""") | |
rows = cur.fetchall() | |
cols = ["rid","time","anime_id","anime","score","preview","sentiment","character"] | |
df = pd.DataFrame(rows, columns=cols) | |
df["time"] = pd.to_datetime(df["time"], errors="coerce", utc=True) | |
if "sentiment" in df.columns: | |
df["sentiment"] = df["sentiment"].astype(str).str.strip().str.lower() | |
return df | |
db_init() | |
if "db_loaded_once" not in st.session_state: | |
df_warm = db_load_recent(days_window) | |
if not df_warm.empty: | |
st.session_state.store = pd.concat([st.session_state.store, df_warm], ignore_index=True) | |
st.session_state.store = st.session_state.store.drop_duplicates( | |
subset=["rid","anime_id","preview"] | |
).sort_values("time", ascending=False) | |
st.session_state["db_loaded_once"] = True | |
st.sidebar.metric("Rows in DB (within window)", len(db_load_recent(days_window))) | |
# =========================== | |
# CSV bootstrap (auto-append once) | |
# =========================== | |
CSV_RENAME_MAP = { | |
# text columns | |
"text": "preview", "review": "preview", "content": "preview", | |
"comment": "preview", "body": "preview", "review_text": "preview", "text_en": "preview", | |
# anime title | |
"title": "anime", "anime_title": "anime", | |
# timestamps | |
"date": "time", "created_at": "time", "timestamp": "time", "datetime": "time", | |
} | |
BOOTSTRAP_FILES = [ | |
"anime_reviews_part_001.csv", | |
"anime_reviews_part_002.csv", | |
"anime_reviews_part_003.csv", | |
"anime_reviews_part_004.csv", | |
] | |
def _standardize_columns(df: pd.DataFrame) -> pd.DataFrame: | |
df = df.rename(columns={k: v for k, v in CSV_RENAME_MAP.items() if k in df.columns}) | |
for col, default in [("anime", ""), ("preview", ""), ("score", None)]: | |
if col not in df.columns: df[col] = default | |
if "time" in df.columns: | |
df["time"] = pd.to_datetime(df["time"], errors="coerce", utc=True) | |
else: | |
df["time"] = pd.Timestamp.utcnow() | |
if "sentiment" in df.columns: | |
df["sentiment"] = df["sentiment"].astype(str).str.strip().str.lower() | |
return df | |
def _build_rid(df: pd.DataFrame) -> pd.Series: | |
base = (df["anime"].fillna("") + "|" + df["preview"].fillna("")) | |
return base.map(lambda s: abs(hash(s)) % (10**12)) | |
def _maybe_backfill_titles_by_id(df: pd.DataFrame) -> pd.DataFrame: | |
if "anime_id" not in df.columns: | |
return df | |
mask = df["anime_id"].notna() & (df["anime"].astype(str).str.strip() == "") | |
if not mask.any(): | |
return df | |
for aid in pd.to_numeric(df.loc[mask, "anime_id"], errors="coerce").dropna().unique(): | |
t = get_anime_title_by_id(int(aid)) | |
if t: | |
df.loc[(df["anime_id"] == aid) & mask, "anime"] = t | |
return df | |
def _maybe_fill_ids_by_title(df: pd.DataFrame) -> pd.DataFrame: | |
if "anime_id" in df.columns and df["anime_id"].notna().any(): | |
return df | |
if "anime" not in df.columns: | |
return df | |
df["anime_id"] = pd.NA | |
titles_to_resolve = ( | |
df["anime"].dropna().astype(str).str.strip().replace("", pd.NA).dropna().unique().tolist() | |
)[:50] # cap per import to be polite | |
title_to_id = {} | |
for t in titles_to_resolve: | |
mid = find_anime_id_by_title(t) | |
if mid: | |
title_to_id[t] = mid | |
if title_to_id: | |
df["anime_id"] = df.apply( | |
lambda r: title_to_id.get(str(r.get("anime","")).strip(), pd.NA), | |
axis=1 | |
) | |
return df | |
def _load_one_csv(path: str) -> pd.DataFrame: | |
try: | |
d = pd.read_csv(path) | |
except Exception: | |
try: | |
d = pd.read_csv(path, encoding="utf-8-sig") | |
except Exception: | |
return pd.DataFrame() | |
d = _standardize_columns(d) | |
if "rid" not in d.columns: | |
d["rid"] = _build_rid(d) | |
# Try to fill ids from titles (capped), then backfill titles from ids | |
d = _maybe_fill_ids_by_title(d) | |
d = _maybe_backfill_titles_by_id(d) | |
if "sentiment" not in d.columns and not show_raw_only and len(d): | |
d["sentiment"] = predict_fn()(d["preview"].astype(str).tolist()) | |
if "character" not in d.columns: | |
d["character"] = "" | |
if "anime_id" in d.columns and not show_raw_only: | |
for aid in pd.to_numeric(d["anime_id"], errors="coerce").dropna().unique(): | |
try: | |
base_names = get_anime_characters(int(aid)) | |
names = expand_aliases(base_names, tag_mode) | |
m = d["anime_id"] == aid | |
idxs = d.loc[m].index[:max_tag_rows] # cap work per upload | |
d.loc[idxs, "character"] = d.loc[idxs, "preview"].apply( | |
lambda s: tag_character(str(s), names, mode=tag_mode) | |
) | |
except Exception: | |
continue | |
keep = ["rid","time","anime_id","anime","score","preview","sentiment","character"] | |
for c in keep: | |
if c not in d.columns: d[c] = None | |
return d[keep] | |
def bootstrap_append_csvs(filenames: list[str]) -> int: | |
roots = ["", "./data", "/data"] | |
frames = [] | |
for fn in filenames: | |
found = None | |
for root in roots: | |
candidate = os.path.join(root, fn) if root else fn | |
if os.path.exists(candidate): | |
found = candidate; break | |
if not found: | |
continue | |
df = _load_one_csv(found) | |
if not df.empty: | |
frames.append(df) | |
if not frames: | |
return 0 | |
dfc = pd.concat(frames, ignore_index=True) | |
dfc = dfc.drop_duplicates(subset=["rid","anime_id","preview"]) | |
st.session_state.store = pd.concat([st.session_state.store, dfc], ignore_index=True) | |
st.session_state.store = st.session_state.store.drop_duplicates( | |
subset=["rid","anime_id","preview"] | |
).sort_values("time", ascending=False) | |
db_upsert_df(dfc) # persist bootstrap | |
return len(dfc) | |
if "bootstrapped_csv" not in st.session_state: | |
appended_n = bootstrap_append_csvs(BOOTSTRAP_FILES) | |
st.session_state["bootstrapped_csv"] = True | |
if appended_n: | |
st.sidebar.success(f"Bootstrapped {appended_n} rows from bundled CSVs") | |
# =========================== | |
# Prepare & plan | |
# =========================== | |
def _normalize_common(df: pd.DataFrame) -> pd.DataFrame: | |
if df.empty: | |
return df | |
if "sentiment" in df.columns: | |
df["sentiment"] = df["sentiment"].astype(str).str.strip().str.lower() | |
df = _maybe_backfill_titles_by_id(df) | |
return df | |
def _prepare(df: pd.DataFrame) -> pd.DataFrame: | |
if df.empty: return df | |
df["time"] = pd.to_datetime(df["time"], errors="coerce", utc=True) | |
if days_window and days_window > 0: | |
cutoff = pd.Timestamp.utcnow() - pd.Timedelta(days=days_window) | |
df = df[df["time"] >= cutoff] | |
df = df.drop_duplicates(subset=[c for c in ["rid","anime_id","preview"] if c in df.columns]) | |
df["preview"] = df["preview"].fillna("").astype(str) | |
df = _normalize_common(df) | |
return df | |
def build_unified_plan() -> List[Tuple[str, Dict]]: | |
plan: List[Tuple[str, Dict]] = [] | |
for p in range(1, global_pages + 1): | |
plan.append(("global", {"page": p})) | |
for a in get_top_airing(topN): | |
for p in range(1, review_pages + 1): | |
plan.append(("anime", {"anime_id": a["mal_id"], "title": a["title"], "page": p})) | |
for a in get_top_anime(topN): | |
for p in range(1, review_pages + 1): | |
plan.append(("anime", {"anime_id": a["mal_id"], "title": a["title"], "page": p})) | |
return plan | |
if not st.session_state.plan or st.session_state.plan_i >= len(st.session_state.plan): | |
st.session_state.plan = build_unified_plan() | |
st.session_state.plan_i = 0 | |
# Execute small batch per refresh | |
new_rows = [] | |
if not pause_fetch: | |
for _ in range(TASKS_PER_RUN): | |
if st.session_state.plan_i >= len(st.session_state.plan): | |
break | |
kind, args = st.session_state.plan[st.session_state.plan_i] | |
st.session_state.plan_i += 1 | |
if kind == "global": | |
dfp = get_global_reviews(page=args["page"]) | |
else: | |
dfp = get_anime_reviews(args["anime_id"], page=args["page"]) | |
if not dfp.empty: | |
dfp["anime"] = args["title"] | |
if not dfp.empty: | |
new_rows.append(dfp) | |
if new_rows: | |
df_new = _prepare(pd.concat(new_rows, ignore_index=True)) | |
if not show_raw_only and not df_new.empty: | |
df_new["sentiment"] = predict_fn()(df_new["preview"].tolist()) | |
df_new["character"] = "" | |
if "anime_id" in df_new.columns: | |
for aid in pd.to_numeric(df_new["anime_id"], errors="coerce").dropna().unique(): | |
try: | |
base_names = get_anime_characters(int(aid)) | |
names = expand_aliases(base_names, tag_mode) | |
mask = df_new["anime_id"] == aid | |
idxs = df_new.loc[mask].index[:max_tag_rows] # cap work per refresh | |
df_new.loc[idxs, "character"] = df_new.loc[idxs, "preview"].apply( | |
lambda s: tag_character(str(s), names, mode=tag_mode) | |
) | |
except Exception: | |
continue | |
st.session_state.store = pd.concat([st.session_state.store, df_new], ignore_index=True) | |
st.session_state.store = st.session_state.store.drop_duplicates(subset=["rid","anime_id","preview"]).sort_values("time", ascending=False) | |
db_upsert_df(df_new) | |
# Backfill on cold start (only when fetching is allowed) | |
df_all = st.session_state.store.copy() | |
if (df_all is None or len(df_all) < 300) and not pause_fetch: | |
extra = [get_global_reviews(page=p) for p in range(global_pages + 1, global_pages + 1 + extra_global_pages)] | |
df_extra = pd.concat(extra, ignore_index=True) if extra else pd.DataFrame() | |
if not df_extra.empty: | |
df_extra = _prepare(df_extra) | |
if not show_raw_only and not df_extra.empty: | |
df_extra["sentiment"] = predict_fn()(df_extra["preview"].tolist()) | |
if "character" not in df_extra.columns: | |
df_extra["character"] = "" | |
df_all = pd.concat([df_all, df_extra], ignore_index=True).drop_duplicates(subset=["rid","anime_id","preview"]) | |
db_upsert_df(df_extra) | |
# =========================== | |
# CSV uploader (diagnostics) | |
# =========================== | |
def df_from_csv_files(files) -> pd.DataFrame: | |
raw_rows = 0 | |
after_std = 0 | |
frames = [] | |
for f in files: | |
try: | |
d = pd.read_csv(f) | |
except Exception: | |
try: | |
d = pd.read_csv(f, encoding="utf-8-sig") | |
except Exception: | |
continue | |
raw_rows += len(d) | |
d = _standardize_columns(d) | |
after_std += len(d) | |
if "rid" not in d.columns: | |
base = (d["anime"].fillna("") + "|" + d["preview"].fillna("")) | |
d["rid"] = base.map(lambda s: abs(hash(s)) % (10**12)) | |
# Try to fill ids from titles (capped), then backfill titles from ids | |
d = _maybe_fill_ids_by_title(d) | |
d = _maybe_backfill_titles_by_id(d) | |
if "sentiment" not in d.columns and not show_raw_only and len(d): | |
d["sentiment"] = predict_fn()(d["preview"].astype(str).tolist()) | |
if "character" not in d.columns: | |
d["character"] = "" | |
if "anime_id" in d.columns and not show_raw_only: | |
for aid in pd.to_numeric(d["anime_id"], errors="coerce").dropna().unique(): | |
try: | |
names = expand_aliases(get_anime_characters(int(aid)), tag_mode) | |
m = d["anime_id"] == aid | |
idxs = d.loc[m].index[:max_tag_rows] # cap work per upload | |
d.loc[idxs, "character"] = d.loc[idxs, "preview"].apply( | |
lambda s: tag_character(str(s), names, mode=tag_mode) | |
) | |
except Exception: | |
pass | |
keep = ["rid","time","anime_id","anime","score","preview","sentiment","character"] | |
for c in keep: | |
if c not in d.columns: d[c] = None | |
frames.append(d[keep]) | |
df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame() | |
# window filter (only if >0) | |
if days_window and days_window > 0: | |
cutoff = pd.Timestamp.utcnow() - pd.Timedelta(days=days_window) | |
df_window = df[pd.to_datetime(df["time"], errors="coerce", utc=True) >= cutoff].copy() | |
else: | |
df_window = df.copy() | |
df_final = df_window.drop_duplicates(subset=["rid","anime_id","preview"]) | |
dropped_window = after_std - len(df_window) | |
dropped_dupes = len(df_window) - len(df_final) | |
st.sidebar.info( | |
f"CSV import diagnostics:\n" | |
f"- Raw rows read: {raw_rows}\n" | |
f"- After standardize: {after_std}\n" | |
f"- Removed by date window: {dropped_window}\n" | |
f"- Removed as duplicates: {dropped_dupes}\n" | |
f"- Final to append: {len(df_final)}" | |
) | |
return df_final | |
if uploads: | |
df_csv = df_from_csv_files(uploads) | |
if not df_csv.empty: | |
st.session_state.store = pd.concat([st.session_state.store, df_csv], ignore_index=True) | |
st.session_state.store = st.session_state.store.drop_duplicates( | |
subset=["rid","anime_id","preview"] | |
).sort_values("time", ascending=False) | |
df_all = st.session_state.store.copy() | |
db_upsert_df(df_csv) | |
st.sidebar.success(f"Appended {len(df_csv)} rows from CSV") | |
# Final fallback (never empty UI) | |
if df_all is None or df_all.empty: | |
df_all = pd.DataFrame([{ | |
"rid": 0, "time": pd.Timestamp.utcnow(), "anime_id": 1, "anime": "Alien Stage", | |
"score": 8, "preview": "Jaehee’s arc was touching and Min did great.", | |
"sentiment": "positive", "character": "Jaehee" | |
}]) | |
# =========================== | |
# UI helpers & views | |
# =========================== | |
def kpi_row(df: pd.DataFrame): | |
c1, c2, c3, c4 = st.columns(4) | |
with c1: st.metric("Total reviews", int(len(df))) | |
with c2: st.metric("Unique anime", int(pd.to_numeric(df["anime_id"], errors="coerce").dropna().nunique())) | |
with c3: st.metric("Have sentiment?", "Yes" if "sentiment" in df.columns else "Raw only") | |
with c4: st.metric("Tagged rows", int(df["character"].fillna("").ne("").sum()) if "character" in df.columns else 0) | |
def plot_overview(df: pd.DataFrame): | |
if "sentiment" in df.columns and not df["sentiment"].isna().all(): | |
order = ["negative", "neutral", "positive"] | |
counts = df["sentiment"].astype(str).str.lower().value_counts().reindex(order).fillna(0).astype(int).reset_index() | |
counts.columns = ["sentiment", "count"] | |
st.plotly_chart(px.bar(counts, x="sentiment", y="count", title="Sentiment distribution"), use_container_width=True) | |
ts = (df.set_index(pd.to_datetime(df["time"], utc=True, errors="coerce")) | |
.assign(n=1).resample("1H")["n"].sum().reset_index()) | |
if not ts.empty: | |
st.plotly_chart(px.line(ts, x="time", y="n", title="Review volume (hourly)"), use_container_width=True) | |
def character_view(df: pd.DataFrame): | |
if "character" not in df.columns: | |
st.info("No characters tagged yet. Turn off 'Raw reviews only'.") | |
return | |
tagged = df[df["character"].fillna("") != ""].copy() | |
if tagged.empty: | |
st.info("No characters tagged yet. Try Fuzzy or Loose mode.") | |
return | |
animes = ( | |
tagged["anime"].astype(str).str.strip().replace("", pd.NA).dropna().unique().tolist() | |
) | |
animes = sorted(animes) | |
if not animes: | |
st.info("No anime titles yet (titles were blank).") | |
return | |
sel = st.selectbox("Select an anime", animes, key="char_view_sel") | |
sub = tagged[tagged["anime"].astype(str).str.strip() == sel].copy() | |
g = sub.groupby("character").agg( | |
mentions=("preview", "count"), | |
pos_rate=("sentiment", lambda s: float((s.astype(str).str.lower() == "positive").mean()) if not s.empty else 0.0), | |
neg_rate=("sentiment", lambda s: float((s.astype(str).str.lower() == "negative").mean()) if not s.empty else 0.0), | |
avg_user_score=("score", "mean"), | |
).reset_index() | |
g["pos_rate"] = pd.to_numeric(g["pos_rate"], errors="coerce").fillna(0.0) | |
g["neg_rate"] = pd.to_numeric(g["neg_rate"], errors="coerce").fillna(0.0) | |
g["avg_user_score"] = pd.to_numeric(g["avg_user_score"], errors="coerce") | |
g["sentiment_index"] = g["pos_rate"] - g["neg_rate"] | |
min_mentions = st.slider("Min mentions", 1, 30, 1) | |
g2 = g[g["mentions"] >= min_mentions].sort_values(["sentiment_index", "mentions", "pos_rate"], ascending=[False, False, False]) | |
c1, c2 = st.columns(2) | |
with c1: | |
st.subheader("Best character ⭐") | |
if not g2.empty: | |
br = g2.iloc[0] | |
ok_best = (br["pos_rate"] > br["neg_rate"]) if strict_best else (br["pos_rate"] > 0) | |
if ok_best: | |
st.metric(br["character"], f"Pos {br['pos_rate']*100:.0f}% / Neg {br['neg_rate']*100:.0f}%") | |
else: | |
st.write("No clear best character (not enough positive signal).") | |
else: | |
st.write("Not enough mentions yet.") | |
with c2: | |
st.subheader("Worst character 👎") | |
g3 = g[g["mentions"] >= min_mentions].copy().sort_values(["neg_rate", "mentions"], ascending=[False, False]) | |
if not g3.empty: | |
wr = g3.iloc[0] | |
st.metric(wr["character"], f"Neg {wr['neg_rate']*100:.0f}% / Pos {wr['pos_rate']*100:.0f}%") | |
g_pos = g2[g2["sentiment_index"] > 0].copy() | |
if not g_pos.empty: | |
st.plotly_chart( | |
px.bar(g_pos.head(12), x="sentiment_index", y="character", orientation="h", | |
title="Top characters by sentiment index (pos - neg, positive only)"), | |
use_container_width=True, | |
) | |
else: | |
st.info("No positive-leaning characters yet for the current filters.") | |
table = ( | |
g2.assign( | |
pos_pct=(g2["pos_rate"] * 100).round(1), | |
neg_pct=(g2["neg_rate"] * 100).round(1), | |
avg_user_score=g2["avg_user_score"].round(2), | |
)[["character","mentions","pos_pct","neg_pct","avg_user_score","sentiment_index"]] | |
) | |
st.dataframe(table, use_container_width=True, hide_index=True) | |
def live_table(df: pd.DataFrame): | |
dfv = df.copy() | |
if only_rows_with_character and "character" in dfv.columns: | |
dfv = dfv[dfv["character"].fillna("") != ""] | |
cols = [c for c in ["time","anime","character","score","sentiment","preview"] if c in dfv.columns] | |
st.dataframe(dfv[cols].head(2000), use_container_width=True, hide_index=True) | |
# =========================== | |
# Render + Persistence controls | |
# =========================== | |
caption = f"Unified plan • Window: {days_window if days_window>0 else 'ALL'}d • Batch: {TASKS_PER_RUN} • TopN: {topN} • Pages: global {global_pages}+{extra_global_pages} backfill, per-anime {review_pages}" | |
st.caption(caption) | |
st.sidebar.markdown("---") | |
st.sidebar.subheader("Persistence") | |
st.sidebar.caption(f"DB path: `{DB_PATH}`") | |
if st.sidebar.button("Save now"): | |
n = db_upsert_df(st.session_state.store) | |
st.sidebar.success(f"Saved {n} new rows to DB") | |
if st.sidebar.button("Reload from DB (within window)"): | |
df_warm = db_load_recent(days_window) | |
if not df_warm.empty: | |
st.session_state.store = pd.concat([st.session_state.store, df_warm], ignore_index=True) | |
st.session_state.store = st.session_state.store.drop_duplicates( | |
subset=["rid","anime_id","preview"] | |
).sort_values("time", ascending=False) | |
st.sidebar.success(f"Reloaded {len(df_warm)} rows from DB") | |
if st.sidebar.button("Clear DB (danger)"): | |
with _conn(DB_PATH) as con: | |
con.execute("DELETE FROM reviews;") | |
st.sidebar.warning("Database cleared.") | |
if df_all is None or df_all.empty: | |
st.info("No data yet. Keep running; widened plan & fuzzy tagging enabled.") | |
live_table(df_all) | |
else: | |
tabs = st.tabs(["Overview", "Character view", "Live feed"]) if not show_raw_only else st.tabs(["Live feed"]) | |
if not show_raw_only: | |
with tabs[0]: | |
kpi_row(df_all) | |
plot_overview(df_all) | |
st.caption("🔎 Tip: turn OFF Auto-refresh while exploring to avoid flicker.") | |
with tabs[1]: | |
character_view(df_all) | |
with tabs[2]: | |
live_table(df_all) | |
else: | |
with tabs[0]: | |
live_table(df_all) | |
# =========================== | |
# Auto-refresh | |
# =========================== | |
auto_refresh = st.sidebar.checkbox("Auto-refresh", value=True, help="Turns the live fetch loop on/off (causes page redraw).") | |
if auto_refresh: | |
time.sleep(auto_refresh_sec + random.uniform(0.0, 1.0)) # jitter reduces concurrent spikes | |
st.rerun() | |