Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β REVIEW INTELLIGENCE AGENT v4 β LangGraph + LangChain β | |
| β BERT sentiment Β· merged tools Β· app-aware Β· Pinecone native embed β | |
| β Code-agent tool Β· scope guard Β· LLM fallback chain β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CHANGES FROM v3: | |
| β NLP β DistilBERT (transformers) replaces LLM batched sentiment. | |
| Zero token cost. Runs locally. Falls back to rating-heuristic. | |
| β‘ Tools β get_top_negative / get_top_positive merged into one tool | |
| get_reviews_by_rating(min_stars, max_stars, n, app_name?). | |
| All tools accept optional app_name to handle multi-app CSVs. | |
| β’ Pinecone β uses pc.create_index_for_model() + idx.upsert_records() | |
| (Pinecone integrated embedding, no OpenAI dependency). | |
| Query uses pc.inference.embed() for the query vector. | |
| β£ Code-Agent β new @tool run_pandas_code(code) that POSTs to a | |
| deployed HuggingFace Space (docker) for heavy pandas/stats. | |
| β€ Scope guard β planner tracks iteration count; terminates cleanly | |
| after MAX_PLANNER_ITERATIONS to avoid infinite loops. | |
| Install: | |
| pip install langgraph langchain langchain-core langchain-openai \ | |
| langchain-groq pinecone pandas transformers torch \ | |
| rich python-dotenv requests | |
| Usage: | |
| python review_agent_v4.py --csv reviews.csv | |
| python review_agent_v4.py --csv reviews.csv \\ | |
| --query "Which action game has the most ad complaints?" | |
| python review_agent_v4.py --csv reviews.csv --use-pinecone \\ | |
| --query "Show crash issues for com.JindoBlu app" | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| import logging | |
| import operator | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import textwrap | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Annotated, Any, Optional, TypedDict | |
| import pandas as pd | |
| import requests | |
| # ββ LangGraph / LangChain ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from langgraph.graph import StateGraph, START, END | |
| from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, SystemMessage | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.tools import tool | |
| try: | |
| from langchain_groq import ChatGroq | |
| HAS_GROQ = True | |
| except ImportError: | |
| HAS_GROQ = False | |
| try: | |
| from langchain_openai import ChatOpenAI | |
| HAS_OPENAI_PKG = True | |
| except ImportError: | |
| HAS_OPENAI_PKG = False | |
| # ββ Pinecone (native SDK only β no langchain-pinecone needed) ββββββββββββββ | |
| try: | |
| from pinecone import Pinecone as PineconeClient, ServerlessSpec | |
| from pinecone import CloudProvider, AwsRegion, EmbedModel, IndexEmbed | |
| HAS_PINECONE = True | |
| except ImportError: | |
| HAS_PINECONE = False | |
| # ββ BERT sentiment (local, zero token cost) ββββββββββββββββββββββββββββββββ | |
| try: | |
| from transformers import pipeline as hf_pipeline | |
| HAS_TRANSFORMERS = True | |
| except ImportError: | |
| HAS_TRANSFORMERS = False | |
| # ββ Rich terminal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| from rich import box | |
| RICH = True | |
| console = Console() | |
| except ImportError: | |
| RICH = False | |
| console = None # type: ignore | |
| logging.basicConfig(level=logging.WARNING) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONSTANTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LLM providers | |
| GROQ_MODEL = "llama-3.3-70b-versatile" | |
| OPENROUTER_URL = "https://openrouter.ai/api/v1" | |
| OPENROUTER_MODEL = "meta-llama/llama-3.3-70b-instruct" | |
| NVIDIA_URL = "https://integrate.api.nvidia.com/v1" | |
| NVIDIA_MODEL = "meta/llama-3.3-70b-instruct" | |
| # BERT model β lightweight distilled, ~67M params, runs on CPU | |
| BERT_SENTIMENT_MODEL = "distilbert-base-uncased-finetuned-sst-2-english" | |
| BERT_BATCH_SIZE = 64 # reviews per inference batch | |
| MAX_BERT_TEXT_LEN = 512 # chars; BERT max tokens β 512 subwords | |
| # Pipeline limits | |
| SAMPLE_ROWS_FOR_SCHEMA = 5 | |
| MAX_REVIEWS_NLP = 5_000 # BERT is fast; can handle more | |
| MAX_CLUSTERS = 10 | |
| MAX_PLANNER_ITERATIONS = 4 # β£ scope guard: stop after N iterations | |
| ANOMALY_SIGMA = 2.0 | |
| # Pinecone integrated embedding | |
| PINECONE_EMBED_MODEL = "multilingual-e5-large" # free on Pinecone starter | |
| PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX", "review-agent-v4") | |
| PINECONE_NAMESPACE = "reviews" | |
| PINECONE_TOP_K = 6 | |
| PINECONE_UPSERT_BATCH = 96 # upsert_records batch size | |
| # Code-agent HuggingFace Space endpoint | |
| # Deploy the companion space (see code_agent_space/ folder) and set this env var | |
| HF_CODE_AGENT_URL = os.getenv( | |
| "HF_CODE_AGENT_URL", | |
| "https://YOUR-HF-USERNAME-review-code-agent.hf.space/run" | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β LLM PROVIDER CHAIN β Groq β OpenRouter β NVIDIA NIMs | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_groq(): | |
| key = os.getenv("GROQ_API_KEY", "") | |
| if not key or not HAS_GROQ: | |
| return None | |
| return ChatGroq(model=GROQ_MODEL, temperature=0, max_retries=1, api_key=key) | |
| def _make_openrouter(): | |
| key = os.getenv("OPENROUTER_API_KEY", "") | |
| if not key or not HAS_OPENAI_PKG: | |
| return None | |
| return ChatOpenAI( | |
| model=OPENROUTER_MODEL, temperature=0, max_retries=1, api_key=key, | |
| base_url=OPENROUTER_URL, | |
| default_headers={"HTTP-Referer": "review-agent-v4"}, | |
| ) | |
| def _make_nvidia(): | |
| key = os.getenv("NVIDIA_API_KEY", "") | |
| if not key or not HAS_OPENAI_PKG: | |
| return None | |
| return ChatOpenAI( | |
| model=NVIDIA_MODEL, temperature=0, max_retries=1, | |
| api_key=key, base_url=NVIDIA_URL, | |
| ) | |
| def build_llm(): | |
| """Return LLM chain with .with_fallbacks() or None.""" | |
| providers = [p for p in [_make_groq(), _make_openrouter(), _make_nvidia()] | |
| if p is not None] | |
| if not providers: | |
| return None | |
| primary = providers[0] | |
| fallbacks = providers[1:] | |
| return primary.with_fallbacks(fallbacks) if fallbacks else primary | |
| LLM = build_llm() | |
| _PROVIDER_TAG = ("GroqβOpenRouterβNVIDIA" if LLM else "NO LLM β heuristic only") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β BERT SENTIMENT ANALYSER (local, zero cost) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _bert_pipe = None # lazy-loaded | |
| def _load_bert(): | |
| global _bert_pipe | |
| if _bert_pipe is None and HAS_TRANSFORMERS: | |
| _log("Loading DistilBERT sentiment model (first run may download ~270 MB)β¦", "info") | |
| _bert_pipe = hf_pipeline( | |
| "sentiment-analysis", | |
| model=BERT_SENTIMENT_MODEL, | |
| truncation=True, | |
| max_length=512, | |
| device=-1, # CPU; set device=0 for GPU | |
| batch_size=BERT_BATCH_SIZE, | |
| ) | |
| _log("DistilBERT loaded β", "ok") | |
| return _bert_pipe | |
| def bert_sentiment(texts: list[str]) -> dict[str, Any]: | |
| """ | |
| Run DistilBERT sentiment on a list of texts. | |
| Returns counts + themes extracted purely from rating distribution (no LLM). | |
| Falls back to None if transformers not installed. | |
| """ | |
| pipe = _load_bert() | |
| if pipe is None: | |
| return {} | |
| # Truncate texts for BERT | |
| clean = [t[:MAX_BERT_TEXT_LEN] for t in texts if t.strip()] | |
| if not clean: | |
| return {} | |
| pos = neg = 0 | |
| for i in range(0, len(clean), BERT_BATCH_SIZE): | |
| batch = clean[i:i + BERT_BATCH_SIZE] | |
| try: | |
| results = pipe(batch) | |
| for r in results: | |
| if r["label"] == "POSITIVE": | |
| pos += 1 | |
| else: | |
| neg += 1 | |
| except Exception as e: | |
| _log(f"BERT batch error: {e}", "warn") | |
| total = max(1, pos + neg) | |
| neu = 0 # DistilBERT SST-2 is binary; neutral is inferred below | |
| pct_pos = round(pos / total * 100, 1) | |
| pct_neg = round(neg / total * 100, 1) | |
| # Heuristic tone | |
| if pct_pos >= 60: tone = "Positive" | |
| elif pct_neg >= 50: tone = "Negative" | |
| elif abs(pct_pos - pct_neg) < 20: tone = "Polarised" | |
| else: tone = "Mixed" | |
| return { | |
| "pct_positive": pct_pos, | |
| "pct_neutral": round(100 - pct_pos - pct_neg, 1), | |
| "pct_negative": pct_neg, | |
| "overall_tone": tone, | |
| "method": "DistilBERT (local)", | |
| "themes": [], # populated by LLM cluster node | |
| "phrases": [], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRAPH STATE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ReviewState(TypedDict, total=False): | |
| # Inputs | |
| filepath: str | |
| user_query: str | |
| use_pinecone: bool | |
| # Data | |
| raw_df: Any # pd.DataFrame | |
| total_rows: int | |
| columns: list[str] | |
| schema: dict[str, Optional[str]] | |
| schema_confidence: str | |
| detected_apps: list[str] # unique app names found in data | |
| # Analysis | |
| stats: dict[str, Any] | |
| app_breakdown: list[dict] | |
| sentiment: dict[str, Any] | |
| clusters: list[dict] | |
| sample_texts: list[str] | |
| # Planner | |
| tool_results: list[dict] | |
| planner_notes: str | |
| planner_iter: int # β£ scope guard counter | |
| # RAG | |
| rag_context: str | |
| # Report | |
| report: dict[str, Any] | |
| # Message trace (reducer = append) | |
| messages: Annotated[list[BaseMessage], operator.add] | |
| # Meta | |
| errors: list[str] | |
| run_id: str | |
| def _empty_state(filepath: str, query: str, use_pinecone: bool) -> ReviewState: | |
| return ReviewState( | |
| filepath=filepath, user_query=query, use_pinecone=use_pinecone, | |
| raw_df=None, total_rows=0, columns=[], schema={}, | |
| schema_confidence="", detected_apps=[], | |
| stats={}, app_breakdown=[], sentiment={}, clusters=[], | |
| sample_texts=[], tool_results=[], planner_notes="", | |
| planner_iter=0, rag_context="", report={}, | |
| messages=[], errors=[], run_id=f"run_{int(time.time())}", | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL CLOSURE STATE (set before planner runs) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _ACTIVE: dict[str, Any] = {} | |
| def _df() -> Optional[pd.DataFrame]: return _ACTIVE.get("raw_df") | |
| def _schema() -> dict: return _ACTIVE.get("schema", {}) | |
| def _apps() -> list[str]: return _ACTIVE.get("detected_apps", []) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β‘ TOOLS β merged, app-aware | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _base_df(app_name: Optional[str] = None) -> Optional[pd.DataFrame]: | |
| """Return DataFrame, optionally filtered to a specific app.""" | |
| df = _df() | |
| if df is None: | |
| return None | |
| ac = _schema().get("app") | |
| if app_name and ac: | |
| mask = df[ac].astype(str).str.lower().str.contains( | |
| re.escape(app_name.lower()), na=False) | |
| return df[mask].copy() | |
| return df.copy() | |
| def get_reviews_by_rating( | |
| min_stars: int = 1, | |
| max_stars: int = 5, | |
| n: int = 15, | |
| app_name: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| Retrieve reviews filtered by star rating range [min_stars, max_stars]. | |
| Replaces separate positive/negative tools. | |
| Examples: | |
| get_reviews_by_rating(1, 2) β all 1-2 star reviews | |
| get_reviews_by_rating(4, 5, n=20) β top positive reviews | |
| get_reviews_by_rating(1, 1, app_name="Challenge") β 1-star for one app | |
| Use this for any star-based filtering. It understands that ratings 1-2 | |
| are negative, 3 is neutral, and 4-5 are positive. | |
| """ | |
| df = _base_df(app_name) | |
| s = _schema() | |
| rc = s.get("rating"); tc = s.get("text") | |
| if df is None or not rc or not tc: | |
| return json.dumps({"error": "No rating or text column."}) | |
| df["__r"] = pd.to_numeric(df[rc], errors="coerce") | |
| mask = (df["__r"] >= min_stars) & (df["__r"] <= max_stars) | |
| filtered = df[mask].dropna(subset=["__r"]) | |
| cols = {tc: "text", rc: "rating"} | |
| ac = s.get("app") | |
| if ac: cols[ac] = "app" | |
| uc = s.get("user") | |
| if uc: cols[uc] = "user" | |
| rows = (filtered[list(cols.keys())] | |
| .rename(columns=cols) | |
| .head(n) | |
| .to_dict("records")) | |
| return json.dumps({ | |
| "filter": f"{min_stars}β{max_stars} stars", | |
| "app_filter": app_name or "all", | |
| "count_matched": int(mask.sum()), | |
| "returned": len(rows), | |
| "reviews": rows, | |
| }, ensure_ascii=False) | |
| def get_most_helpful_reviews( | |
| n: int = 10, | |
| app_name: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| Return the reviews with the most helpful/thumbs-up votes. | |
| Optionally filtered to a specific app. Useful for finding highly-validated complaints. | |
| """ | |
| df = _base_df(app_name) | |
| s = _schema() | |
| hc = s.get("helpful"); tc = s.get("text"); rc = s.get("rating") | |
| if df is None or not hc or not tc: | |
| return json.dumps({"error": "No helpful column found."}) | |
| df["__h"] = pd.to_numeric(df[hc], errors="coerce").fillna(0) | |
| cols = {tc: "text", hc: "helpful_votes"} | |
| if rc: cols[rc] = "rating" | |
| ac = s.get("app") | |
| if ac: cols[ac] = "app" | |
| rows = (df.nlargest(n, "__h")[list(cols.keys())] | |
| .rename(columns=cols) | |
| .to_dict("records")) | |
| return json.dumps({ | |
| "app_filter": app_name or "all", | |
| "returned": len(rows), | |
| "reviews": rows, | |
| }, ensure_ascii=False) | |
| def get_rating_timeseries(app_name: Optional[str] = None) -> str: | |
| """ | |
| Return daily average rating over time [{date, avg_rating, count}]. | |
| Pass app_name to see trends for a specific app only. | |
| Useful for spotting when ratings dropped after an update. | |
| """ | |
| df = _base_df(app_name) | |
| s = _schema() | |
| dc = s.get("date"); rc = s.get("rating") | |
| if df is None or not dc or not rc: | |
| return json.dumps({"error": "No date or rating column."}) | |
| df["__d"] = pd.to_datetime(df[dc], errors="coerce") | |
| df["__r"] = pd.to_numeric(df[rc], errors="coerce") | |
| df = df.dropna(subset=["__d", "__r"]) | |
| grp = df.groupby(df["__d"].dt.date)["__r"] | |
| daily = pd.DataFrame({ | |
| "date": grp.mean().index.astype(str), | |
| "avg_rating": grp.mean().values.round(2), | |
| "count": grp.count().values, | |
| }) | |
| return json.dumps({ | |
| "app_filter": app_name or "all", | |
| "rows": daily.to_dict("records"), | |
| }, ensure_ascii=False) | |
| def search_reviews_by_keyword( | |
| keyword: str, | |
| max_results: int = 20, | |
| app_name: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| Full-text search reviews for a keyword or phrase. | |
| Optionally filter to a specific app. | |
| Use this to investigate specific topics: 'crash', 'ads', 'login', etc. | |
| """ | |
| df = _base_df(app_name) | |
| s = _schema() | |
| tc = s.get("text"); rc = s.get("rating") | |
| if df is None or not tc: | |
| return json.dumps({"error": "No text column."}) | |
| mask = df[tc].astype(str).str.lower().str.contains( | |
| re.escape(keyword.lower()), na=False, regex=True) | |
| cols = {tc: "text"} | |
| if rc: cols[rc] = "rating" | |
| ac = s.get("app") | |
| if ac: cols[ac] = "app" | |
| rows = (df[mask][list(cols.keys())] | |
| .rename(columns=cols) | |
| .head(max_results) | |
| .to_dict("records")) | |
| return json.dumps({ | |
| "keyword": keyword, | |
| "app_filter": app_name or "all", | |
| "count_matched": int(mask.sum()), | |
| "returned": len(rows), | |
| "reviews": rows, | |
| }, ensure_ascii=False) | |
| def get_app_comparison(metric: str = "avg_rating") -> str: | |
| """ | |
| Compare all detected apps by a metric: avg_rating, pct_negative, | |
| pct_positive, count, or helpful_votes. | |
| Use when the CSV has multiple apps and you need to rank them. | |
| """ | |
| breakdown = _ACTIVE.get("app_breakdown", []) | |
| if not breakdown: | |
| return json.dumps({"error": "No app breakdown computed yet."}) | |
| valid_metrics = {"avg_rating","pct_negative","pct_positive","count","helpful_votes"} | |
| m = metric if metric in valid_metrics else "avg_rating" | |
| ranked = sorted(breakdown, key=lambda x: x.get(m) or 0, reverse=(m != "pct_negative")) | |
| return json.dumps({ | |
| "metric": m, | |
| "ranking": [ | |
| {"rank": i+1, "app": r["app"], m: r.get(m)} | |
| for i, r in enumerate(ranked) | |
| ], | |
| }, ensure_ascii=False) | |
| def get_rating_distribution(app_name: Optional[str] = None) -> str: | |
| """ | |
| Return star count breakdown {1: N, 2: N, 3: N, 4: N, 5: N}. | |
| Optionally for a single app. | |
| """ | |
| df = _base_df(app_name) | |
| s = _schema(); rc = s.get("rating") | |
| if df is None or not rc: | |
| return json.dumps({"error": "No rating column."}) | |
| df["__r"] = pd.to_numeric(df[rc], errors="coerce") | |
| dist = df["__r"].value_counts().sort_index() | |
| return json.dumps({ | |
| "app_filter": app_name or "all", | |
| "distribution": {str(int(k)): int(v) for k, v in dist.items() if not pd.isna(k)}, | |
| }, ensure_ascii=False) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β’ CODE-AGENT TOOL β runs pandas/stats code on HuggingFace Space | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pandas_code(code: str) -> str: | |
| """ | |
| Execute arbitrary pandas / statistics Python code on the review DataFrame | |
| hosted in a HuggingFace Space code-execution endpoint. | |
| Use this when you need: | |
| - custom aggregations not covered by other tools | |
| - statistical tests (e.g. t-test between two apps' ratings) | |
| - pivot tables, cross-tabs, rolling averages | |
| - complex filtering with multiple conditions | |
| The remote environment has: pandas, numpy, scipy, statsmodels. | |
| The DataFrame is available as `df` with the current schema columns. | |
| The result must be assigned to a variable named `result`. | |
| Example code: | |
| import scipy.stats as stats | |
| g1 = df[df['app']=='AppA']['score'].dropna() | |
| g2 = df[df['app']=='AppB']['score'].dropna() | |
| t, p = stats.ttest_ind(g1, g2) | |
| result = {'t_stat': round(float(t),4), 'p_value': round(float(p),4)} | |
| """ | |
| url = HF_CODE_AGENT_URL.rstrip("/") + "/execute" | |
| if "YOUR-HF-USERNAME" in url: | |
| return json.dumps({ | |
| "error": "HF_CODE_AGENT_URL not configured. " | |
| "Set env var HF_CODE_AGENT_URL to your deployed HuggingFace Space URL.", | |
| "hint": "See code_agent_space/app.py in this repo for the companion Space.", | |
| }) | |
| # Send the CSV as base64 + code to the Space | |
| df = _df() | |
| if df is None: | |
| return json.dumps({"error": "No dataframe loaded."}) | |
| import base64, io | |
| buf = io.StringIO() | |
| df.to_csv(buf, index=False) | |
| csv_b64 = base64.b64encode(buf.getvalue().encode()).decode() | |
| try: | |
| resp = requests.post( | |
| url, | |
| json={"csv_b64": csv_b64, "code": code, "schema": _schema()}, | |
| timeout=30, | |
| ) | |
| resp.raise_for_status() | |
| return resp.text | |
| except requests.exceptions.ConnectionError: | |
| return json.dumps({"error": f"Cannot reach HF Space at {url}. Is it deployed?"}) | |
| except requests.exceptions.Timeout: | |
| return json.dumps({"error": "Code agent timed out (>30s)."}) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| # All tools list β used by planner | |
| TOOLS = [ | |
| get_reviews_by_rating, | |
| get_most_helpful_reviews, | |
| get_rating_timeseries, | |
| search_reviews_by_keyword, | |
| get_app_comparison, | |
| get_rating_distribution, | |
| run_pandas_code, | |
| ] | |
| TOOL_MAP = {t.name: t for t in TOOLS} | |
| TOOL_DESCRIPTIONS = """ | |
| - get_reviews_by_rating(min_stars, max_stars, n, app_name?) β filter by star range 1-5. Use instead of separate positive/negative tools. | |
| - get_most_helpful_reviews(n, app_name?) β most upvoted reviews | |
| - get_rating_timeseries(app_name?) β daily avg rating trend | |
| - search_reviews_by_keyword(keyword, max_results, app_name?) β full-text search | |
| - get_app_comparison(metric) β rank all apps by metric (avg_rating|pct_negative|countβ¦) | |
| - get_rating_distribution(app_name?) β star count breakdown | |
| - run_pandas_code(code) β custom pandas/scipy code on HuggingFace Space for complex stats | |
| """ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LLM HELPER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _llm_json(system: str, user: str, fallback_fn=None) -> dict | list: | |
| if LLM is None: | |
| return fallback_fn() if fallback_fn else {} | |
| prompt = ChatPromptTemplate.from_messages([ | |
| SystemMessage(content=system), | |
| ("human", "{u}") | |
| ]) | |
| chain = prompt | LLM | |
| try: | |
| resp = chain.invoke({"u": user}) | |
| raw = getattr(resp, "content", str(resp)).strip() | |
| raw = re.sub(r"^```(?:json)?\s*", "", raw) | |
| raw = re.sub(r"\s*```$", "", raw) | |
| return json.loads(raw) | |
| except Exception as e: | |
| _log(f"LLM JSON parse failed: {e}", "warn") | |
| return fallback_fn() if fallback_fn else {} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 1 β INGESTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def node_ingest(state: ReviewState) -> dict: | |
| _log("Node [ingest]", "agent") | |
| if state.get("raw_df") is not None: | |
| df = state["raw_df"] | |
| _log(f"Using provided DataFrame: {len(df):,} rows", "ok") | |
| return { | |
| "total_rows": len(df), | |
| "columns": list(df.columns), | |
| "messages": [HumanMessage(content=f"Using provided data with {len(df)} reviews")], | |
| } | |
| fp = state["filepath"] | |
| if not fp: | |
| return {"errors": ["ingest: no filepath or dataframe provided"]} | |
| ext = Path(fp).suffix.lower() | |
| try: | |
| if ext in (".xls", ".xlsx", ".xlsm"): | |
| df = pd.read_excel(fp) | |
| else: | |
| for enc in ("utf-8", "utf-8-sig", "latin-1", "cp1252"): | |
| try: | |
| df = pd.read_csv(fp, encoding=enc); break | |
| except Exception: continue | |
| else: | |
| raise ValueError(f"Cannot decode {fp}") | |
| df = df.dropna(how="all") | |
| _log(f"Loaded {len(df):,} rows Γ {len(df.columns)} cols", "ok") | |
| return { | |
| "raw_df": df, "total_rows": len(df), | |
| "columns": list(df.columns), | |
| "messages": [HumanMessage(content=f"Loaded {len(df)} reviews")], | |
| } | |
| except Exception as e: | |
| return {"errors": [f"ingest: {e}"], | |
| "messages": [HumanMessage(content=f"ERROR loading: {e}")]} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 2 β LLM SCHEMA DETECTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SCHEMA_SYS = """You are a CSV schema analyst. | |
| Given column names and sample rows from a user-review dataset, map each role to the exact column name. | |
| Return ONLY valid JSON (no markdown): | |
| {{ | |
| "text": "<review text column or null>", | |
| "rating": "<numeric star/score column or null>", | |
| "date": "<date/timestamp column or null>", | |
| "app": "<app/product/game identifier column or null>", | |
| "user": "<reviewer name/id column or null>", | |
| "helpful": "<helpful/upvote count column or null>", | |
| "confidence": "high|medium|low", | |
| "reasoning": "one sentence" | |
| }} | |
| Use EXACT column names. null if uncertain.""" | |
| def node_schema(state: ReviewState) -> dict: | |
| _log("Node [schema]", "agent") | |
| df = state.get("raw_df") | |
| if df is None: | |
| return {"errors": ["schema: no dataframe"]} | |
| sample = df.head(SAMPLE_ROWS_FOR_SCHEMA).to_dict(orient="records") | |
| result = _llm_json( | |
| _SCHEMA_SYS, | |
| json.dumps({"columns": state["columns"], "sample_rows": sample}), | |
| fallback_fn=lambda: _heuristic_schema(state["columns"]), | |
| ) | |
| schema = {k: result.get(k) for k in ["text","rating","date","app","user","helpful"]} | |
| conf = result.get("confidence","low") | |
| _log(f"Schema ({conf}): {schema}", "ok") | |
| # Detect unique apps so tools can be app-aware | |
| apps: list[str] = [] | |
| ac = schema.get("app") | |
| if ac and df is not None and ac in df.columns: | |
| apps = sorted(df[ac].dropna().astype(str).unique().tolist()) | |
| _log(f"Detected {len(apps)} apps: {apps[:5]}", "info") | |
| return { | |
| "schema": schema, "schema_confidence": conf, | |
| "detected_apps": apps, | |
| "messages": [AIMessage(content=f"Schema ({conf}): {schema}. Apps: {apps[:5]}")], | |
| } | |
| def _heuristic_schema(cols: list[str]) -> dict: | |
| low = {c.lower(): c for c in cols} | |
| def first(pats): | |
| for p in pats: | |
| for cl, c in low.items(): | |
| if p in cl: return c | |
| return None | |
| return { | |
| "text": first(["content","review","text","body","comment"]), | |
| "rating": first(["score","rating","stars","rate","grade"]), | |
| "date": first(["date","time","created","posted","at"]), | |
| "app": first(["app","product","game","title","name"]), | |
| "user": first(["user","reviewer","author"]), | |
| "helpful": first(["helpful","thumbs","vote","like","useful"]), | |
| "confidence": "low", "reasoning": "heuristic fallback", | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 3 β STATISTICAL ANALYSIS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def node_stats(state: ReviewState) -> dict: | |
| _log("Node [stats]", "agent") | |
| df = state.get("raw_df"); s = state.get("schema", {}) | |
| rc = s.get("rating"); tc = s.get("text") | |
| if df is None or not rc: | |
| return {"stats": {}} | |
| df2 = df.copy() | |
| df2["__r"] = pd.to_numeric(df2[rc], errors="coerce") | |
| valid = df2["__r"].dropna() | |
| stats: dict[str, Any] = { | |
| "total_reviews": int(len(df2)), | |
| "rated_reviews": int(len(valid)), | |
| "avg_rating": round(float(valid.mean()), 3) if len(valid) else None, | |
| "median_rating": float(valid.median()) if len(valid) else None, | |
| "std_rating": round(float(valid.std()), 3) if len(valid) else None, | |
| "pct_positive": round(float((valid >= 4).mean() * 100), 1), | |
| "pct_negative": round(float((valid <= 2).mean() * 100), 1), | |
| "pct_neutral": round(float(((valid > 2) & (valid < 4)).mean() * 100), 1), | |
| "rating_distribution": { | |
| str(int(k)): int(v) | |
| for k, v in df2["__r"].value_counts().sort_index().items() | |
| if not pd.isna(k) | |
| }, | |
| } | |
| if tc: | |
| lens = df2[tc].dropna().astype(str).str.len() | |
| stats["avg_review_length"] = int(lens.mean()) | |
| stats["short_reviews_pct"] = round(float((lens < 20).mean() * 100), 1) | |
| hc = s.get("helpful") | |
| if hc: | |
| df2["__h"] = pd.to_numeric(df2[hc], errors="coerce").fillna(0) | |
| stats["total_helpful_votes"] = int(df2["__h"].sum()) | |
| dc = s.get("date") | |
| if dc: | |
| try: | |
| df2["__d"] = pd.to_datetime(df2[dc], errors="coerce") | |
| daily = df2.dropna(subset=["__d","__r"]).groupby(df2["__d"].dt.date)["__r"].mean() | |
| if len(daily) > 7: | |
| m, std = daily.mean(), daily.std() | |
| bad = daily[daily < (m - ANOMALY_SIGMA * std)] | |
| stats["anomaly_days"] = [ | |
| {"date": str(d), "avg_rating": round(float(v), 2)} | |
| for d, v in bad.items() | |
| ] | |
| except Exception: | |
| pass | |
| # Per-app breakdown | |
| app_rows = [] | |
| ac = s.get("app") | |
| if ac: | |
| for name, grp in df2.groupby(ac): | |
| gr = pd.to_numeric(grp["__r"], errors="coerce") | |
| row: dict[str, Any] = { | |
| "app": str(name), "count": int(len(grp)), | |
| "avg_rating": round(float(gr.mean()), 2) if len(gr) else None, | |
| "pct_negative": round(float((gr <= 2).mean() * 100), 1), | |
| "pct_positive": round(float((gr >= 4).mean() * 100), 1), | |
| } | |
| if hc: | |
| row["helpful_votes"] = int( | |
| pd.to_numeric(grp[hc], errors="coerce").fillna(0).sum()) | |
| app_rows.append(row) | |
| app_rows.sort(key=lambda x: x.get("avg_rating") or 5) | |
| stats["apps_analyzed"] = len(app_rows) | |
| _log(f"Stats: avg={stats.get('avg_rating')} neg={stats.get('pct_negative')}%", "ok") | |
| return { | |
| "stats": stats, "app_breakdown": app_rows, | |
| "messages": [AIMessage(content=( | |
| f"Stats: {stats['total_reviews']} reviews, avg {stats.get('avg_rating')}/5"))], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 4 β β BERT SENTIMENT (local, zero token cost) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def node_nlp(state: ReviewState) -> dict: | |
| _log("Node [nlp/BERT]", "agent") | |
| df = state.get("raw_df"); s = state.get("schema", {}) | |
| tc = s.get("text") | |
| if df is None or not tc: | |
| return {"sentiment": _rating_sentiment_fallback(state)} | |
| # Sample | |
| sample_df = df.sample(min(MAX_REVIEWS_NLP, len(df)), random_state=42) | |
| texts = sample_df[tc].fillna("").astype(str).tolist() | |
| # β Try BERT first (free, local) | |
| bert_result = bert_sentiment(texts) | |
| if bert_result: | |
| _log(f"BERT sentiment: pos={bert_result['pct_positive']}% " | |
| f"neg={bert_result['pct_negative']}% tone={bert_result['overall_tone']}", "ok") | |
| return { | |
| "sentiment": bert_result, | |
| "sample_texts": texts[:500], | |
| "messages": [AIMessage(content=( | |
| f"BERT sentiment: {bert_result['overall_tone']} " | |
| f"(pos={bert_result['pct_positive']}% neg={bert_result['pct_negative']}%)"))], | |
| } | |
| # β‘ Fall back to rating-based heuristic (no transformers installed) | |
| _log("BERT unavailable β using rating heuristic", "warn") | |
| fallback = _rating_sentiment_fallback(state) | |
| return { | |
| "sentiment": fallback, | |
| "sample_texts": texts[:500], | |
| } | |
| def _rating_sentiment_fallback(state: ReviewState) -> dict: | |
| stats = state.get("stats", {}) | |
| return { | |
| "pct_positive": stats.get("pct_positive", 0), | |
| "pct_neutral": stats.get("pct_neutral", 0), | |
| "pct_negative": stats.get("pct_negative", 0), | |
| "overall_tone": "Unknown", | |
| "method": "rating-heuristic (no BERT)", | |
| "themes": [], "phrases": [], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 5 β LLM CLUSTERING (topic discovery, not keyword matching) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CLUSTER_SYS = f"""You are a product issue analyst. | |
| Given a mixed sample of user reviews, discover distinct topic clusters. | |
| Return ONLY valid JSON: | |
| {{ | |
| "clusters": [ | |
| {{ | |
| "label": "Short name", | |
| "type": "issue|praise|request|general", | |
| "description": "1-2 sentence summary", | |
| "frequency_signal": "high|medium|low", | |
| "severity": "critical|high|medium|low", | |
| "example_quote": "verbatim short quote", | |
| "keywords": ["word1","word2"] | |
| }} | |
| ] | |
| }} | |
| Max {MAX_CLUSTERS} clusters. Merge near-duplicates. Issues first.""" | |
| def node_cluster(state: ReviewState) -> dict: | |
| _log("Node [cluster]", "agent") | |
| df = state.get("raw_df"); s = state.get("schema", {}) | |
| rc = s.get("rating"); tc = s.get("text") | |
| if df is None or not tc or LLM is None: | |
| return {"clusters": []} | |
| df2 = df.copy() | |
| if rc: | |
| df2["__r"] = pd.to_numeric(df2[rc], errors="coerce") | |
| neg = df2.nsmallest(50, "__r") | |
| pos = df2.nlargest(30, "__r") | |
| rnd = df2.sample(min(50, len(df2)), random_state=7) | |
| combined = pd.concat([neg, pos, rnd]).drop_duplicates(subset=[tc]) | |
| else: | |
| combined = df2.sample(min(130, len(df2)), random_state=7) | |
| sample = [{"text": str(r[tc])[:280], "rating": r.get(rc)} | |
| for _, r in combined.iterrows()][:130] | |
| result = _llm_json(_CLUSTER_SYS, json.dumps(sample)) | |
| clusters = result.get("clusters", []) | |
| _sev = {"critical":0,"high":1,"medium":2,"low":3} | |
| _typ = {"issue":0,"request":1,"general":2,"praise":3} | |
| clusters.sort(key=lambda c: ( | |
| _typ.get(c.get("type","general"),9), | |
| _sev.get(c.get("severity","low"),9))) | |
| _log(f"Discovered {len(clusters)} clusters", "ok") | |
| return { | |
| "clusters": clusters, | |
| "messages": [AIMessage(content=f"Found {len(clusters)} topic clusters")], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 6 β β£ PLANNER (ReAct with scope guard) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _PLANNER_SYS = """You are an analytical planner for a review intelligence system. | |
| A user asked a specific question. Decide which tools to call to best answer it. | |
| Available tools: | |
| {tools} | |
| Already computed context: | |
| {context} | |
| Detected apps in dataset: {apps} | |
| User query: {query} | |
| Rules: | |
| - Use get_reviews_by_rating(min_stars, max_stars) instead of separate positive/negative tools. | |
| - If multiple apps exist and the query targets one, pass app_name to every tool call. | |
| - Use run_pandas_code only for custom stats not covered by other tools. | |
| - Choose 1-3 tools maximum. Do not repeat a tool with identical arguments. | |
| Return ONLY valid JSON: | |
| {{ | |
| "reasoning": "1-2 sentences", | |
| "calls": [{{"tool": "name", "args": {{...}}}}], | |
| "done": false | |
| }} | |
| Set "done": true if the context already fully answers the query with no more tools needed. | |
| """ | |
| def node_planner(state: ReviewState) -> dict: | |
| _log("Node [planner]", "agent") | |
| query = state.get("user_query", "").strip() | |
| itr = state.get("planner_iter", 0) | |
| # β£ Scope guard β terminate if iterations exceeded | |
| if itr >= MAX_PLANNER_ITERATIONS: | |
| _log(f"Planner reached max iterations ({MAX_PLANNER_ITERATIONS}) β terminating", "warn") | |
| return { | |
| "planner_notes": f"Terminated after {itr} iterations (scope guard).", | |
| "planner_iter": itr, | |
| "messages": [AIMessage(content="Planner: max iterations reached, proceeding to report.")], | |
| } | |
| if not query or LLM is None: | |
| return {"planner_notes": "No query or LLM unavailable.", "planner_iter": itr} | |
| # Load active state for tool closures | |
| _ACTIVE.update(state) | |
| context = { | |
| "total_reviews": state.get("stats",{}).get("total_reviews"), | |
| "avg_rating": state.get("stats",{}).get("avg_rating"), | |
| "clusters": [c.get("label") for c in state.get("clusters",[])[:5]], | |
| "overall_tone": state.get("sentiment",{}).get("overall_tone"), | |
| "has_date": bool(state.get("schema",{}).get("date")), | |
| "has_helpful": bool(state.get("schema",{}).get("helpful")), | |
| "tools_called": [t["tool"] for t in state.get("tool_results",[])], | |
| } | |
| plan = _llm_json( | |
| _PLANNER_SYS.format( | |
| tools=TOOL_DESCRIPTIONS, | |
| context=json.dumps(context), | |
| apps=state.get("detected_apps", [])[:10], | |
| query=query, | |
| ), | |
| "", | |
| ) | |
| reasoning = plan.get("reasoning", "") | |
| done = plan.get("done", False) | |
| _log(f"Planner iter={itr+1}: {reasoning}", "info") | |
| # β£ If planner says done, skip tool calls | |
| if done: | |
| return { | |
| "planner_notes": reasoning, | |
| "planner_iter": itr + 1, | |
| "messages": [AIMessage(content=f"Planner done: {reasoning}")], | |
| } | |
| tool_results = list(state.get("tool_results", [])) | |
| for call in plan.get("calls", []): | |
| tname = call.get("tool","") | |
| args = call.get("args",{}) | |
| t = TOOL_MAP.get(tname) | |
| if not t: | |
| continue | |
| _log(f" Tool: {tname}({args})", "info") | |
| try: | |
| result = t.invoke(args) | |
| tool_results.append({"tool": tname, "args": args, "result": result, "ok": True}) | |
| except Exception as e: | |
| tool_results.append({"tool": tname, "args": args, "error": str(e), "ok": False}) | |
| _log(f" Tool {tname} failed: {e}", "warn") | |
| return { | |
| "tool_results": tool_results, | |
| "planner_notes": reasoning, | |
| "planner_iter": itr + 1, | |
| "messages": [AIMessage(content=f"Planner iter {itr+1}: {reasoning}")], | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 7 β β’ PINECONE RAG (native integrated embedding, no OpenAI needed) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def node_rag(state: ReviewState) -> dict: | |
| _log("Node [rag/pinecone]", "agent") | |
| if not state.get("use_pinecone") or not HAS_PINECONE: | |
| return {"rag_context": ""} | |
| api_key = os.getenv("PINECONE_API_KEY", "") | |
| if not api_key: | |
| _log("PINECONE_API_KEY missing β skipping RAG", "warn") | |
| return {"rag_context": ""} | |
| try: | |
| pc = PineconeClient(api_key=api_key) | |
| # β’ Create index with Pinecone's integrated embedding model (no OpenAI) | |
| existing = [idx.name for idx in pc.list_indexes()] | |
| if PINECONE_INDEX_NAME not in existing: | |
| _log(f"Creating Pinecone index '{PINECONE_INDEX_NAME}' with {PINECONE_EMBED_MODEL}β¦", "info") | |
| pc.create_index_for_model( | |
| name=PINECONE_INDEX_NAME, | |
| cloud="aws", | |
| region="us-east-1", | |
| embed={ | |
| "model": PINECONE_EMBED_MODEL, | |
| "field_map": {"text": "chunk_text"}, | |
| }, | |
| ) | |
| time.sleep(3) # wait for index to be ready | |
| idx = pc.Index(PINECONE_INDEX_NAME) | |
| # Upsert review sample as records β Pinecone auto-embeds chunk_text | |
| texts = state.get("sample_texts", [])[:500] | |
| if texts: | |
| records = [ | |
| {"_id": f"rev_{i}_{state.get('run_id','')}", "chunk_text": t} | |
| for i, t in enumerate(texts) if t.strip() | |
| ] | |
| # Batch upsert | |
| for i in range(0, len(records), PINECONE_UPSERT_BATCH): | |
| batch = records[i:i + PINECONE_UPSERT_BATCH] | |
| idx.upsert_records(namespace=PINECONE_NAMESPACE, records=batch) | |
| _log(f"Upserted {len(records)} reviews to Pinecone", "ok") | |
| # β’ Query: use pc.inference.embed() for the query vector (Pinecone-native) | |
| query = state.get("user_query") or "most common user complaints and praise" | |
| q_embeddings = pc.inference.embed( | |
| model=PINECONE_EMBED_MODEL, | |
| inputs=[query], | |
| parameters={"input_type": "query", "truncate": "END"}, | |
| ) | |
| q_vector = q_embeddings[0].values | |
| results = idx.query( | |
| namespace=PINECONE_NAMESPACE, | |
| vector=q_vector, | |
| top_k=PINECONE_TOP_K, | |
| include_metadata=True, | |
| ) | |
| matches = results.get("matches", []) | |
| rag_context = "\n\n".join( | |
| f"[Similar review {i+1} score={round(m.get('score',0),3)}]: " | |
| f"{m.get('metadata',{}).get('chunk_text','')}" | |
| for i, m in enumerate(matches) | |
| ) | |
| _log(f"RAG: retrieved {len(matches)} similar reviews", "ok") | |
| return { | |
| "rag_context": rag_context, | |
| "messages": [AIMessage(content=f"RAG: {len(matches)} similar reviews retrieved")], | |
| } | |
| except Exception as e: | |
| _log(f"Pinecone RAG failed: {e}", "warn") | |
| return {"rag_context": ""} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 8 β REPORT SYNTHESISER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _REPORT_SYS = """You are a senior product intelligence analyst. | |
| Produce a structured insight report from pipeline data, tool results, and RAG context. | |
| User query: {query} | |
| RAG context: {rag} | |
| Return ONLY valid JSON: | |
| {{ | |
| "executive_summary": "2-3 sentences. Lead with direct answer if query present.", | |
| "direct_answer": "1-2 sentence direct answer, or null.", | |
| "top_problems": [ | |
| {{"issue":"","description":"","severity":"critical|high|medium|low", | |
| "frequency":"high|medium|low","evidence":""}} | |
| ], | |
| "key_strengths": [{{"strength":"","description":"","evidence":""}}], | |
| "trend_observations": [{{"observation":"","detail":"","data_ref":""}}], | |
| "anomalies": [{{"anomaly":"","type":"Spike|Pattern|Outlier|Trend", | |
| "detail":"","hypothesis":""}}], | |
| "recommendations": [ | |
| {{"priority":"critical|high|medium|low","action":"", | |
| "rationale":"","expected_impact":""}} | |
| ], | |
| "confidence_note": "" | |
| }}""" | |
| def node_report(state: ReviewState) -> dict: | |
| _log("Node [report]", "agent") | |
| payload = { | |
| "stats": state.get("stats",{}), | |
| "sentiment": state.get("sentiment",{}), | |
| "clusters": state.get("clusters",[])[:MAX_CLUSTERS], | |
| "app_breakdown": state.get("app_breakdown",[])[:10], | |
| "tool_results": [ | |
| {"tool": t["tool"], "result": str(t.get("result",""))[:400]} | |
| for t in state.get("tool_results",[]) if t.get("ok") | |
| ], | |
| } | |
| if LLM is None: | |
| return {"report": _heuristic_report(state)} | |
| system = _REPORT_SYS.format( | |
| query=state.get("user_query") or "(general analysis)", | |
| rag=state.get("rag_context") or "None", | |
| ) | |
| report = _llm_json(system, json.dumps(payload), | |
| fallback_fn=lambda: _heuristic_report(state)) | |
| _log("Report synthesised", "ok") | |
| return { | |
| "report": report, | |
| "messages": [AIMessage(content="Report synthesis complete.")], | |
| } | |
| def _heuristic_report(state: ReviewState) -> dict: | |
| clusters = state.get("clusters",[]) | |
| stats = state.get("stats",{}) | |
| sent = state.get("sentiment",{}) | |
| problems = [ | |
| {"issue": c.get("label","?"), "description": c.get("description",""), | |
| "severity": c.get("severity","medium"), "frequency": c.get("frequency_signal","medium"), | |
| "evidence": c.get("example_quote","")[:150]} | |
| for c in clusters if c.get("type")=="issue" | |
| ][:5] | |
| return { | |
| "executive_summary": ( | |
| f"Analysed {stats.get('total_reviews','?')} reviews. " | |
| f"Avg rating {stats.get('avg_rating','?')}/5. " | |
| f"Tone: {sent.get('overall_tone','?')} (method: {sent.get('method','?')})."), | |
| "direct_answer": None, | |
| "top_problems": problems, | |
| "key_strengths": [], | |
| "trend_observations": [], | |
| "anomalies": [ | |
| {"anomaly": f"Drop on {d['date']}", "type": "Spike", | |
| "detail": f"Avg {d['avg_rating']}", "hypothesis": "Possible bad update"} | |
| for d in stats.get("anomaly_days",[])[:3] | |
| ], | |
| "recommendations": [ | |
| {"priority": p["severity"], "action": f"Fix: {p['issue']}", | |
| "rationale": p["description"], "expected_impact": "Better ratings"} | |
| for p in problems[:4] | |
| ], | |
| "confidence_note": "Heuristic fallback β LLM unavailable.", | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONDITIONAL EDGES | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _route_after_stats(state: ReviewState) -> str: | |
| return "nlp" if state.get("schema",{}).get("text") else "planner" | |
| def _route_after_cluster(state: ReviewState) -> str: | |
| if state.get("user_query","").strip() and LLM is not None: | |
| return "planner" | |
| return "rag" | |
| def _route_after_planner(state: ReviewState) -> str: | |
| # β£ Also route to report if scope guard triggered | |
| itr = state.get("planner_iter", 0) | |
| done = itr >= MAX_PLANNER_ITERATIONS | |
| if state.get("use_pinecone") and HAS_PINECONE and not done: | |
| return "rag" | |
| return "report" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRAPH ASSEMBLY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_graph(): | |
| g = StateGraph(ReviewState) | |
| g.add_node("ingest", node_ingest) | |
| g.add_node("schema", node_schema) | |
| g.add_node("stats", node_stats) | |
| g.add_node("nlp", node_nlp) | |
| g.add_node("cluster", node_cluster) | |
| g.add_node("planner", node_planner) | |
| g.add_node("rag", node_rag) | |
| g.add_node("report", node_report) | |
| g.add_edge(START, "ingest") | |
| g.add_edge("ingest", "schema") | |
| g.add_edge("schema", "stats") | |
| g.add_conditional_edges("stats", _route_after_stats, | |
| {"nlp":"nlp","planner":"planner"}) | |
| g.add_edge("nlp", "cluster") | |
| g.add_conditional_edges("cluster", _route_after_cluster, | |
| {"planner":"planner","rag":"rag"}) | |
| g.add_conditional_edges("planner", _route_after_planner, | |
| {"rag":"rag","report":"report"}) | |
| g.add_edge("rag", "report") | |
| g.add_edge("report", END) | |
| return g.compile() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RENDERER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render(state: ReviewState): | |
| if RICH: _render_rich(state) | |
| else: _render_plain(state) | |
| def _render_rich(state: ReviewState): | |
| stats = state.get("stats",{}) | |
| sent = state.get("sentiment",{}) | |
| clusters = state.get("clusters",[]) | |
| apps = state.get("app_breakdown",[]) | |
| report = state.get("report",{}) | |
| query = state.get("user_query","") | |
| tools = state.get("tool_results",[]) | |
| itr = state.get("planner_iter",0) | |
| console.rule("[bold cyan]REVIEW INTELLIGENCE v4 Β· LangGraph[/bold cyan]") | |
| if query and report.get("direct_answer"): | |
| console.print(Panel( | |
| f"[bold yellow]Q:[/bold yellow] {query}\n\n" | |
| f"[bold green]{report['direct_answer']}[/bold green]", | |
| title="[bold]Direct Answer[/bold]", border_style="bright_green")) | |
| console.print(Panel( | |
| f"[italic]{report.get('executive_summary','')}[/italic]", | |
| title="[bold]Executive Summary[/bold]", border_style="cyan")) | |
| # Metrics | |
| t = Table(box=box.SIMPLE, show_header=False) | |
| t.add_column("", style="dim"); t.add_column("", style="bold") | |
| method_tag = sent.get("method","") | |
| for k, v in [ | |
| ("Total Reviews", f"{stats.get('total_reviews','?'):,}"), | |
| ("Avg Rating", f"{stats.get('avg_rating','?')} / 5 Ο={stats.get('std_rating','?')}"), | |
| ("% Positive", f"[green]{sent.get('pct_positive','?')}%[/green]"), | |
| ("% Neutral", f"[blue]{sent.get('pct_neutral','?')}%[/blue]"), | |
| ("% Negative", f"[red]{sent.get('pct_negative','?')}%[/red]"), | |
| ("Tone", str(sent.get("overall_tone","?"))), | |
| ("Sentiment method", f"[dim]{method_tag}[/dim]"), | |
| ("LLM Provider", f"[dim]{_PROVIDER_TAG}[/dim]"), | |
| ("Planner iters", str(itr)), | |
| ]: | |
| t.add_row(k, v) | |
| console.print(Panel(t, title="[bold]Key Metrics[/bold]", border_style="green")) | |
| # Rating distribution bar | |
| console.print("[bold]Rating Distribution[/bold]") | |
| for star in [5,4,3,2,1]: | |
| cnt = stats.get("rating_distribution",{}).get(str(star), 0) | |
| total = max(1, stats.get("total_reviews",1)) | |
| bar = "β" * int(cnt/total*40) | |
| color = "green" if star>=4 else "yellow" if star==3 else "red" | |
| console.print(f" {'β '*star:<5} [{color}]{bar:<40}[/{color}] {cnt:,}") | |
| console.print() | |
| # Detected apps | |
| det_apps = state.get("detected_apps",[]) | |
| if det_apps: | |
| console.print(Panel( | |
| " ".join(f"[cyan]{a}[/cyan]" for a in det_apps[:15]), | |
| title=f"[bold]Detected Apps ({len(det_apps)})[/bold]", | |
| border_style="dim")) | |
| console.print() | |
| # Clusters | |
| if clusters: | |
| t = Table(title="LLM-Discovered Clusters", box=box.ROUNDED, border_style="magenta") | |
| t.add_column("Type",width=8); t.add_column("Label",style="bold") | |
| t.add_column("Freq",width=7); t.add_column("Severity",width=10) | |
| t.add_column("Description",style="dim") | |
| for c in clusters[:10]: | |
| typ = c.get("type","?") | |
| tc_ = "green" if typ=="praise" else "red" if typ=="issue" else "blue" | |
| sev = c.get("severity","?") | |
| sc = "red" if sev in("critical","high") else "yellow" if sev=="medium" else "blue" | |
| t.add_row(f"[{tc_}]{typ}[/{tc_}]", c.get("label",""), | |
| c.get("frequency_signal",""), f"[{sc}]{sev}[/{sc}]", | |
| c.get("description","")[:80]) | |
| console.print(t); console.print() | |
| # Top problems | |
| if report.get("top_problems"): | |
| t = Table(title="Top Problems", box=box.ROUNDED, border_style="red") | |
| t.add_column("#",width=3,style="dim"); t.add_column("Issue",style="bold") | |
| t.add_column("Severity",width=10); t.add_column("Evidence",style="dim") | |
| for i, p in enumerate(report["top_problems"],1): | |
| sc = "red" if p.get("severity") in("critical","high") else \ | |
| "yellow" if p.get("severity")=="medium" else "blue" | |
| t.add_row(str(i), p.get("issue",""), | |
| f"[{sc}]{p.get('severity','')}[/{sc}]", | |
| p.get("evidence","")[:90]) | |
| console.print(t); console.print() | |
| # Key strengths | |
| if report.get("key_strengths"): | |
| t = Table(title="Key Strengths", box=box.ROUNDED, border_style="green") | |
| t.add_column("Strength",style="bold"); t.add_column("Evidence",style="dim") | |
| for p in report["key_strengths"]: | |
| t.add_row(p.get("strength",""), p.get("evidence","")[:100]) | |
| console.print(t); console.print() | |
| # Anomalies | |
| if report.get("anomalies"): | |
| t = Table(title="Anomalies", box=box.ROUNDED, border_style="yellow") | |
| t.add_column("Anomaly",style="bold"); t.add_column("Type",width=10) | |
| t.add_column("Detail",style="dim"); t.add_column("Hypothesis",style="dim") | |
| for a in report["anomalies"]: | |
| t.add_row(a.get("anomaly",""), a.get("type",""), | |
| a.get("detail","")[:70], a.get("hypothesis","")[:60]) | |
| console.print(t); console.print() | |
| # Per-app | |
| if apps: | |
| t = Table(title="Per-App Breakdown", box=box.ROUNDED, border_style="cyan") | |
| t.add_column("App",style="bold"); t.add_column("Reviews",justify="right") | |
| t.add_column("Avg",justify="right"); t.add_column("% Neg",justify="right") | |
| t.add_column("% Pos",justify="right") | |
| for a in apps: | |
| avg = a.get("avg_rating") or 5 | |
| col = "red" if avg<2.5 else "yellow" if avg<3.5 else "green" | |
| t.add_row(str(a["app"])[:40], str(a["count"]), | |
| f"[{col}]{avg}[/{col}]", | |
| f"{a.get('pct_negative','?')}%", | |
| f"{a.get('pct_positive','?')}%") | |
| console.print(t); console.print() | |
| # Recommendations | |
| if report.get("recommendations"): | |
| t = Table(title="Recommendations", box=box.ROUNDED, border_style="bright_white") | |
| t.add_column("#",width=3,style="dim"); t.add_column("Priority",width=10) | |
| t.add_column("Action",style="bold"); t.add_column("Impact",style="dim") | |
| for i, r in enumerate(report["recommendations"],1): | |
| pc = "red" if r.get("priority") in("critical","high") else \ | |
| "yellow" if r.get("priority")=="medium" else "blue" | |
| t.add_row(str(i), f"[{pc}]{r.get('priority','')}[/{pc}]", | |
| r.get("action",""), r.get("expected_impact","")[:70]) | |
| console.print(t); console.print() | |
| # Tool call log | |
| if tools: | |
| t = Table(title="Tool Calls", box=box.SIMPLE, border_style="dim") | |
| t.add_column("Tool",style="dim"); t.add_column("Args",style="dim"); t.add_column("β",width=3) | |
| for tc_ in tools: | |
| ok = "[green]β[/green]" if tc_.get("ok") else "[red]β[/red]" | |
| t.add_row(tc_.get("tool",""), str(tc_.get("args",{}))[:55], ok) | |
| console.print(t); console.print() | |
| if report.get("confidence_note"): | |
| console.print(Panel(f"[dim]{report['confidence_note']}[/dim]", | |
| title="Caveats", border_style="dim")) | |
| console.rule("[dim]End of Report β v4[/dim]") | |
| def _render_plain(state: ReviewState): | |
| S = "="*72 | |
| def sec(t): print(f"\n{S}\n {t}\n{S}") | |
| r = state.get("report",{}); st = state.get("stats",{}) | |
| se = state.get("sentiment",{}); q = state.get("user_query","") | |
| sec("REVIEW INTELLIGENCE REPORT v4") | |
| if q: | |
| print(f"\nQuery : {q}") | |
| print(f"Answer: {r.get('direct_answer','(see summary)')}") | |
| print(f"\nSummary : {r.get('executive_summary','')}") | |
| print(f"Sentiment: {se.get('overall_tone','?')} [{se.get('method','?')}]") | |
| print(f"Provider : {_PROVIDER_TAG}") | |
| sec("METRICS") | |
| for k,v in [("Reviews",st.get("total_reviews")),("Avg",st.get("avg_rating")), | |
| ("Pos%",se.get("pct_positive")),("Neg%",se.get("pct_negative"))]: | |
| print(f" {k:<12}: {v}") | |
| sec("CLUSTERS") | |
| for c in state.get("clusters",[])[:8]: | |
| print(f" [{c.get('type','?')}] {c.get('label','?')} sev={c.get('severity','?')}") | |
| sec("PROBLEMS") | |
| for i,p in enumerate(r.get("top_problems",[]),1): | |
| print(f" #{i} [{p.get('severity','')}] {p.get('issue','')}") | |
| print(f" {p.get('evidence','')[:100]}") | |
| sec("RECOMMENDATIONS") | |
| for i,rec in enumerate(r.get("recommendations",[]),1): | |
| print(f" #{i} [{rec.get('priority','')}] {rec.get('action','')}") | |
| print(f"\n{S}\n End\n{S}\n") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOGGING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _log(msg: str, level: str = "info"): | |
| ts = datetime.now().strftime("%H:%M:%S") | |
| icons = {"info":"Β·","ok":"β","warn":"β ","err":"β","agent":"βΆ"} | |
| icon = icons.get(level,"Β·") | |
| if RICH: | |
| colors = {"info":"dim white","ok":"green","warn":"yellow","err":"red","agent":"cyan"} | |
| console.print(f"[{colors.get(level,'white')}][{ts}] {icon} {msg}[/{colors.get(level,'white')}]") | |
| else: | |
| print(f"[{ts}] {icon} {msg}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ENTRY POINT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Review Intelligence Agent v4 β LangGraph + LangChain", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=textwrap.dedent(""" | |
| Environment variables: | |
| GROQ_API_KEY Primary LLM (free tier) | |
| OPENROUTER_API_KEY Fallback #1 | |
| NVIDIA_API_KEY Fallback #2 (NIMs) | |
| PINECONE_API_KEY Pinecone integrated embedding (no OpenAI needed) | |
| PINECONE_INDEX Index name (default: review-agent-v4) | |
| HF_CODE_AGENT_URL Deployed HuggingFace Space for pandas code execution | |
| NLP: | |
| DistilBERT runs locally (free, no API). | |
| pip install transformers torch to enable it. | |
| Falls back to rating-heuristic if not installed. | |
| Examples: | |
| python review_agent_v4.py --csv reviews.csv | |
| python review_agent_v4.py --csv reviews.csv \\ | |
| --query "Which action game has the most ad complaints?" | |
| python review_agent_v4.py --csv reviews.csv --use-pinecone \\ | |
| --query "Show 1-star reviews for com.JindoBlu app" | |
| """), | |
| ) | |
| parser.add_argument("--csv", required=True) | |
| parser.add_argument("--query", default="", | |
| help="Natural language question about the reviews") | |
| parser.add_argument("--use-pinecone",action="store_true", | |
| help="Enable Pinecone RAG (requires PINECONE_API_KEY)") | |
| parser.add_argument("--save-json", action="store_true") | |
| args = parser.parse_args() | |
| if not os.path.exists(args.csv): | |
| print(f"File not found: {args.csv}"); sys.exit(1) | |
| _log(f"LLM: {_PROVIDER_TAG}", "info") | |
| _log(f"BERT: {'DistilBERT (local)' if HAS_TRANSFORMERS else 'unavailable (pip install transformers torch)'}", "info") | |
| if args.use_pinecone and not HAS_PINECONE: | |
| _log("pinecone not installed β RAG disabled (pip install pinecone)", "warn") | |
| graph = build_graph() | |
| init = _empty_state(args.csv, args.query.strip(), | |
| args.use_pinecone and HAS_PINECONE) | |
| t0 = time.time() | |
| final = graph.invoke(init) | |
| _log(f"Pipeline complete in {round(time.time()-t0,1)}s " | |
| f"(planner iters: {final.get('planner_iter',0)})", "ok") | |
| render(final) | |
| if args.save_json: | |
| out = str(Path(args.csv).with_suffix("")) + "_report_v4.json" | |
| safe = {k: v for k, v in final.items() if k != "raw_df"} | |
| with open(out, "w", encoding="utf-8") as f: | |
| json.dump(safe, f, indent=2, default=str) | |
| _log(f"JSON saved β {out}", "ok") | |
| if final.get("errors"): | |
| _log(f"Non-fatal errors: {final['errors']}", "warn") | |
| def run_agent(query: str, csv_path: Optional[str] = None, df: Optional[pd.DataFrame] = None, use_pinecone: bool = False) -> dict: | |
| """Entry point for the web app or other modules.""" | |
| graph = build_graph() | |
| state = _empty_state(csv_path or "", query.strip(), use_pinecone and HAS_PINECONE) | |
| if df is not None: | |
| state["raw_df"] = df | |
| return graph.invoke(state) | |
| if __name__ == "__main__": | |
| main() |