Spaces:
Running
Running
| """Shared analysis helpers used by both live and worker paths.""" | |
| import copy | |
| import re | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from typing import Any, Literal | |
| from app.core.config import settings | |
| from app.db.mongodb import ( | |
| build_empty_persisted_corpus_envelope, | |
| has_persisted_corpus_envelope, | |
| ) | |
| from app.models.schemas import ( | |
| PredictionType, | |
| SentimentType, | |
| TopicSentiment, | |
| TopicEvidenceSnippet, | |
| TopicHighlights, | |
| UserCountPrediction, | |
| ) | |
| from app.services.global_pros_cons import ( | |
| merge_global_pros_cons_signal_sources, | |
| normalize_contextual_global_pros_cons_signals, | |
| ) | |
| def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction: | |
| """Compute the player-count trend prediction from aggregated topics.""" | |
| topic_map = {t.topic: t for t in topics} | |
| # Prefer the renamed public topic, but keep a legacy fallback while | |
| # older cached analyses still carry the previous topic name. | |
| stickiness = topic_map.get("Stickiness") or topic_map.get("Retention") | |
| if stickiness and stickiness.mention_count > 5: | |
| if stickiness.score > settings.prediction_retention_threshold_pos: | |
| return UserCountPrediction( | |
| trend=PredictionType.INCREASING, | |
| confidence=min(0.95, 0.5 + (stickiness.mention_count / 100)), | |
| reasoning="PREDICTION_REASONING_RETENTION_HIGH", | |
| ) | |
| if stickiness.score < settings.prediction_retention_threshold_neg: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=min(0.95, 0.5 + (stickiness.mention_count / 100)), | |
| reasoning="PREDICTION_REASONING_RETENTION_LOW", | |
| ) | |
| bugs = topic_map.get("Bugs") | |
| performance = topic_map.get("Performance") | |
| tech_score = 0.0 | |
| tech_count = 0 | |
| if bugs: | |
| tech_score += bugs.score | |
| tech_count += 1 | |
| if performance: | |
| tech_score += performance.score | |
| tech_count += 1 | |
| if tech_count > 0 and (tech_score / tech_count) < -0.3: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=0.75, | |
| reasoning="PREDICTION_REASONING_TECH_ISSUES", | |
| ) | |
| gameplay = topic_map.get("Gameplay") | |
| fun = topic_map.get("Fun") | |
| gameplay_score = 0.0 | |
| gameplay_count = 0 | |
| if gameplay: | |
| gameplay_score += gameplay.score | |
| gameplay_count += 1 | |
| if fun: | |
| gameplay_score += fun.score | |
| gameplay_count += 1 | |
| if gameplay_count > 0: | |
| average_gameplay = gameplay_score / gameplay_count | |
| if average_gameplay > 0.4: | |
| return UserCountPrediction( | |
| trend=PredictionType.INCREASING, | |
| confidence=0.8, | |
| reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH", | |
| ) | |
| if average_gameplay < -0.2: | |
| return UserCountPrediction( | |
| trend=PredictionType.DECREASING, | |
| confidence=0.6, | |
| reasoning="PREDICTION_REASONING_GAMEPLAY_LOW", | |
| ) | |
| return UserCountPrediction( | |
| trend=PredictionType.STABLE, | |
| confidence=0.5, | |
| reasoning="PREDICTION_REASONING_STABLE", | |
| ) | |
| def aggregate_topics( | |
| existing: list[TopicSentiment], | |
| new_batch: list[TopicSentiment], | |
| ) -> list[TopicSentiment]: | |
| """Merge topic aggregates using weighted mention counts.""" | |
| topic_data: dict[str, dict[str, Any]] = {} | |
| def better_example( | |
| current: tuple[str, float] | None, | |
| new: tuple[str, float] | None, | |
| ) -> tuple[str, float] | None: | |
| if new is None: | |
| return current | |
| if current is None: | |
| return new | |
| return new if abs(new[1]) > abs(current[1]) else current | |
| for topic in existing: | |
| if topic.topic not in topic_data: | |
| topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} | |
| topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) | |
| topic_data[topic.topic]["count"] += topic.mention_count | |
| new_example = ( | |
| (topic.example, topic.example_score) | |
| if topic.example and topic.example_score is not None | |
| else None | |
| ) | |
| topic_data[topic.topic]["example"] = better_example( | |
| topic_data[topic.topic]["example"], | |
| new_example, | |
| ) | |
| for topic in new_batch: | |
| if topic.topic not in topic_data: | |
| topic_data[topic.topic] = {"scores": [], "count": 0, "example": None} | |
| topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count) | |
| topic_data[topic.topic]["count"] += topic.mention_count | |
| new_example = ( | |
| (topic.example, topic.example_score) | |
| if topic.example and topic.example_score is not None | |
| else None | |
| ) | |
| topic_data[topic.topic]["example"] = better_example( | |
| topic_data[topic.topic]["example"], | |
| new_example, | |
| ) | |
| results: list[TopicSentiment] = [] | |
| for topic_name, data in topic_data.items(): | |
| count = data["count"] | |
| if count == 0: | |
| continue | |
| average_score = sum(data["scores"]) / count | |
| normalized_score = max(-1.0, min(1.0, average_score)) | |
| if normalized_score > settings.sentiment_positive_threshold: | |
| sentiment = SentimentType.POSITIVE | |
| elif normalized_score < settings.sentiment_negative_threshold: | |
| sentiment = SentimentType.NEGATIVE | |
| else: | |
| sentiment = SentimentType.NEUTRAL | |
| best_example = None | |
| example_score = None | |
| example_data = data["example"] | |
| if example_data: | |
| example_text, candidate_score = example_data | |
| if sentiment == SentimentType.NEUTRAL or ( | |
| sentiment == SentimentType.POSITIVE and candidate_score > 0 | |
| ) or ( | |
| sentiment == SentimentType.NEGATIVE and candidate_score < 0 | |
| ): | |
| best_example = example_text | |
| example_score = candidate_score | |
| results.append( | |
| TopicSentiment( | |
| topic=topic_name, | |
| sentiment=sentiment, | |
| score=round(normalized_score, 3), | |
| mention_count=count, | |
| example=best_example, | |
| example_score=example_score, | |
| ) | |
| ) | |
| results.sort(key=lambda item: item.mention_count, reverse=True) | |
| return results | |
| def coerce_topic_sentiments(raw_topics: Any) -> list[TopicSentiment]: | |
| """Deserialize persisted topic aggregates into validated TopicSentiment models.""" | |
| if not isinstance(raw_topics, list): | |
| return [] | |
| topics: list[TopicSentiment] = [] | |
| for entry in raw_topics: | |
| if isinstance(entry, TopicSentiment): | |
| topics.append(entry) | |
| continue | |
| if not isinstance(entry, dict): | |
| continue | |
| try: | |
| topics.append(TopicSentiment(**entry)) | |
| except Exception: | |
| continue | |
| return topics | |
| _OPTIONAL_CONTEXT_METRIC_KEYS = ( | |
| "total_sentences", | |
| "skipped_sentences", | |
| "topic_bearing_review_count", | |
| ) | |
| def _coerce_optional_context_metric(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return max(0, int(value)) | |
| except (TypeError, ValueError): | |
| return None | |
| def _merge_optional_context_metric( | |
| existing_value: Any, | |
| delta_value: int | None, | |
| *, | |
| allow_seed: bool, | |
| ) -> int | None: | |
| normalized_delta = _coerce_optional_context_metric(delta_value) | |
| if normalized_delta is None: | |
| return None if not allow_seed else _coerce_optional_context_metric(existing_value) | |
| normalized_existing = _coerce_optional_context_metric(existing_value) | |
| if normalized_existing is None: | |
| return normalized_delta if allow_seed else None | |
| return normalized_existing + normalized_delta | |
| def _apply_optional_context_metrics( | |
| target: dict[str, Any], | |
| *, | |
| prefix: str = "", | |
| source: dict[str, Any] | None, | |
| ) -> None: | |
| for key in _OPTIONAL_CONTEXT_METRIC_KEYS: | |
| payload_key = f"{prefix}{key}" | |
| value = ( | |
| _coerce_optional_context_metric(source.get(key)) | |
| if isinstance(source, dict) | |
| else None | |
| ) | |
| if value is None: | |
| target.pop(payload_key, None) | |
| else: | |
| target[payload_key] = value | |
| def build_general_aggregate( | |
| *, | |
| topics: list[TopicSentiment] | None = None, | |
| review_count: int = 0, | |
| skipped_count: int = 0, | |
| total_sentences: int | None = None, | |
| skipped_sentences: int | None = None, | |
| topic_bearing_review_count: int | None = None, | |
| ) -> dict[str, Any]: | |
| """Build the canonical mergeable persisted aggregate for general.""" | |
| aggregate = { | |
| "review_count": max(0, int(review_count or 0)), | |
| "skipped_count": max(0, int(skipped_count or 0)), | |
| "topics": [ | |
| topic.model_dump(mode="json") | |
| for topic in (topics or []) | |
| ], | |
| } | |
| optional_metrics = { | |
| "total_sentences": total_sentences, | |
| "skipped_sentences": skipped_sentences, | |
| "topic_bearing_review_count": topic_bearing_review_count, | |
| } | |
| for key, value in optional_metrics.items(): | |
| normalized = _coerce_optional_context_metric(value) | |
| if normalized is not None: | |
| aggregate[key] = normalized | |
| return aggregate | |
| def normalize_general_aggregate( | |
| aggregate: dict[str, Any] | None, | |
| *, | |
| fallback_topics: list[TopicSentiment] | None = None, | |
| fallback_review_count: int = 0, | |
| fallback_skipped_count: int = 0, | |
| ) -> dict[str, Any]: | |
| """Normalize persisted general aggregate shape with optional legacy fallbacks.""" | |
| raw_topics = aggregate.get("topics") if isinstance(aggregate, dict) else None | |
| topics = ( | |
| coerce_topic_sentiments(raw_topics) | |
| if isinstance(raw_topics, list) | |
| else list(fallback_topics or []) | |
| ) | |
| review_count = ( | |
| aggregate.get("review_count", fallback_review_count) | |
| if isinstance(aggregate, dict) | |
| else fallback_review_count | |
| ) | |
| skipped_count = ( | |
| aggregate.get("skipped_count", fallback_skipped_count) | |
| if isinstance(aggregate, dict) | |
| else fallback_skipped_count | |
| ) | |
| return build_general_aggregate( | |
| topics=topics, | |
| review_count=review_count, | |
| skipped_count=skipped_count, | |
| total_sentences=( | |
| aggregate.get("total_sentences") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| skipped_sentences=( | |
| aggregate.get("skipped_sentences") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| topic_bearing_review_count=( | |
| aggregate.get("topic_bearing_review_count") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| ) | |
| def get_persisted_general_aggregate( | |
| analysis: "AnalysisDocumentReadModel | None", | |
| ) -> dict[str, Any]: | |
| """Return canonical general aggregate, seeding from legacy results when needed.""" | |
| if analysis is None: | |
| return build_general_aggregate() | |
| fallback_topics = coerce_topic_sentiments(analysis.results.get("general_topics")) | |
| fallback_review_count = analysis.results.get("analyzed_reviews", 0) | |
| fallback_skipped_count = analysis.results.get("skipped_count", 0) | |
| raw_aggregate = None | |
| if analysis.persisted_corpus is not None: | |
| raw_general = analysis.persisted_corpus.get("general") | |
| if isinstance(raw_general, dict): | |
| candidate = raw_general.get("aggregate") | |
| if isinstance(candidate, dict): | |
| raw_aggregate = candidate | |
| return normalize_general_aggregate( | |
| raw_aggregate, | |
| fallback_topics=fallback_topics, | |
| fallback_review_count=fallback_review_count, | |
| fallback_skipped_count=fallback_skipped_count, | |
| ) | |
| def merge_general_aggregate( | |
| existing_aggregate: dict[str, Any] | None, | |
| *, | |
| delta_topics: list[TopicSentiment], | |
| delta_review_count: int, | |
| delta_skipped_count: int, | |
| delta_total_sentences: int | None = None, | |
| delta_skipped_sentences: int | None = None, | |
| delta_topic_bearing_review_count: int | None = None, | |
| ) -> dict[str, Any]: | |
| """Append new general-review contributions onto the canonical persisted aggregate.""" | |
| normalized = normalize_general_aggregate(existing_aggregate) | |
| existing_topics = coerce_topic_sentiments(normalized.get("topics")) | |
| merged_topics = aggregate_topics(existing_topics, delta_topics) | |
| allow_seed_metrics = not existing_aggregate or normalized.get("review_count", 0) == 0 | |
| return build_general_aggregate( | |
| topics=merged_topics, | |
| review_count=normalized.get("review_count", 0) + max(0, delta_review_count), | |
| skipped_count=normalized.get("skipped_count", 0) + max(0, delta_skipped_count), | |
| total_sentences=_merge_optional_context_metric( | |
| normalized.get("total_sentences"), | |
| delta_total_sentences, | |
| allow_seed=allow_seed_metrics, | |
| ), | |
| skipped_sentences=_merge_optional_context_metric( | |
| normalized.get("skipped_sentences"), | |
| delta_skipped_sentences, | |
| allow_seed=allow_seed_metrics, | |
| ), | |
| topic_bearing_review_count=_merge_optional_context_metric( | |
| normalized.get("topic_bearing_review_count"), | |
| delta_topic_bearing_review_count, | |
| allow_seed=allow_seed_metrics, | |
| ), | |
| ) | |
| def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]: | |
| """Scale mention counts for the approximate recent sliding window.""" | |
| return [ | |
| topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))}) | |
| for topic in topics | |
| ] | |
| def filter_topics_by_min_mentions( | |
| topics: list[TopicSentiment], | |
| min_mentions: int | None = None, | |
| ) -> list[TopicSentiment]: | |
| """Filter topics below the minimum mention threshold. | |
| Preserves existing sort order. Only filters — does not modify score or sentiment. | |
| Applied at the final aggregate level, never at the per-review level. | |
| """ | |
| threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions | |
| return [t for t in topics if t.mention_count >= threshold] | |
| def compute_preferred_context(patch_timestamp: int | None) -> str: | |
| """Choose the default user-facing context tab. | |
| Returns 'current_patch' whenever a reliable major patch exists; otherwise | |
| returns 'recent' as the safe current-state fallback. | |
| """ | |
| return "current_patch" if patch_timestamp is not None else "recent" | |
| _TEXT_NORMALIZE_RE = re.compile(r"[^\w\u4e00-\u9fff]+") | |
| _TOPIC_HIGHLIGHT_SOURCE_KEYS = ( | |
| "source_steamid", | |
| "source_recommendation_id", | |
| "source_timestamp_created", | |
| "source_playtime_at_review", | |
| "source_voted_up", | |
| "source_language", | |
| "source_steam_purchase", | |
| "source_received_for_free", | |
| "source_written_during_early_access", | |
| ) | |
| def _highlight_display_limit() -> int: | |
| return max(1, int(settings.highlights_top_n_general)) | |
| def _highlight_working_limit() -> int: | |
| display_limit = _highlight_display_limit() | |
| return max(display_limit + 1, display_limit * 2) | |
| def _topic_highlight_display_limit() -> int: | |
| return max(1, int(settings.highlights_top_n_per_topic)) | |
| def _topic_highlight_working_limit() -> int: | |
| display_limit = _topic_highlight_display_limit() | |
| return max(display_limit + 1, display_limit * 2) | |
| def _global_signal_working_limit() -> int: | |
| return 24 | |
| def build_empty_evidence_cache() -> dict[str, Any]: | |
| return { | |
| "highlights": [], | |
| "topic_highlights": {}, | |
| "global_pros_cons_signals": [], | |
| } | |
| def _normalize_text_identity(value: str) -> str: | |
| normalized = _TEXT_NORMALIZE_RE.sub(" ", str(value).lower()).strip() | |
| return re.sub(r"\s+", " ", normalized) | |
| def _classify_sentiment_value(score: float) -> str: | |
| if score > settings.sentiment_positive_threshold: | |
| return SentimentType.POSITIVE.value | |
| if score < settings.sentiment_negative_threshold: | |
| return SentimentType.NEGATIVE.value | |
| return SentimentType.NEUTRAL.value | |
| def _normalize_highlight_entries(entries: Any) -> list[dict[str, Any]]: | |
| if not isinstance(entries, list): | |
| return [] | |
| normalized: list[dict[str, Any]] = [] | |
| for entry in entries: | |
| if not isinstance(entry, dict): | |
| model_dump = getattr(entry, "model_dump", None) | |
| if not callable(model_dump): | |
| continue | |
| entry = model_dump(mode="json") | |
| phrase = entry.get("phrase") or entry.get("text") | |
| mention_count = int(entry.get("mention_count") or 0) | |
| if not phrase or mention_count <= 0: | |
| continue | |
| normalized.append( | |
| { | |
| "phrase": str(phrase), | |
| "mention_count": mention_count, | |
| "sentiment": str(entry.get("sentiment") or _classify_sentiment_value(float(entry.get("score") or 0.0))), | |
| "score": float(entry.get("score") or 0.0), | |
| "ngram_size": int(entry.get("ngram_size") or max(1, len(str(phrase).split()))), | |
| } | |
| ) | |
| return normalized | |
| def _normalize_topic_highlight_entries(entries: Any) -> list[dict[str, Any]]: | |
| if not isinstance(entries, list): | |
| return [] | |
| normalized: list[dict[str, Any]] = [] | |
| for entry in entries: | |
| if not isinstance(entry, dict): | |
| model_dump = getattr(entry, "model_dump", None) | |
| if not callable(model_dump): | |
| continue | |
| entry = model_dump(mode="json") | |
| text = entry.get("text") or entry.get("phrase") | |
| mention_count = int(entry.get("mention_count") or 0) | |
| if not text or mention_count <= 0: | |
| continue | |
| normalized_entry = { | |
| "text": str(text), | |
| "mention_count": mention_count, | |
| "sentiment": str(entry.get("sentiment") or _classify_sentiment_value(float(entry.get("score") or 0.0))), | |
| "score": float(entry.get("score") or 0.0), | |
| } | |
| for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS: | |
| if key in entry and entry.get(key) is not None: | |
| normalized_entry[key] = copy.deepcopy(entry.get(key)) | |
| normalized.append(normalized_entry) | |
| return normalized | |
| def _extract_topic_highlight_source(entry: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| key: copy.deepcopy(entry.get(key)) | |
| for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS | |
| if key in entry and entry.get(key) is not None | |
| } | |
| def _has_topic_highlight_source(entry: dict[str, Any]) -> bool: | |
| return any(entry.get(key) is not None for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS) | |
| def _replace_topic_highlight_source(target: dict[str, Any], source: dict[str, Any]) -> None: | |
| for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS: | |
| target.pop(key, None) | |
| target.update(source) | |
| def _normalize_topic_highlight_map(raw_topic_highlights: Any) -> dict[str, list[dict[str, Any]]]: | |
| if isinstance(raw_topic_highlights, dict): | |
| normalized: dict[str, list[dict[str, Any]]] = {} | |
| for topic, entries in raw_topic_highlights.items(): | |
| if not isinstance(topic, str): | |
| continue | |
| topic_entries = _normalize_topic_highlight_entries(entries) | |
| if topic_entries: | |
| normalized[topic] = topic_entries | |
| return normalized | |
| if not isinstance(raw_topic_highlights, list): | |
| return {} | |
| normalized_from_list: dict[str, list[dict[str, Any]]] = {} | |
| for entry in raw_topic_highlights: | |
| if not isinstance(entry, dict): | |
| model_dump = getattr(entry, "model_dump", None) | |
| if not callable(model_dump): | |
| continue | |
| entry = model_dump(mode="json") | |
| topic = entry.get("topic") | |
| if not isinstance(topic, str) or not topic: | |
| continue | |
| topic_entries = _normalize_topic_highlight_entries(entry.get("highlights")) | |
| if topic_entries: | |
| normalized_from_list[topic] = topic_entries | |
| return normalized_from_list | |
| def normalize_evidence_cache(cache: Any) -> dict[str, Any]: | |
| if not isinstance(cache, dict): | |
| return build_empty_evidence_cache() | |
| raw_global_signals = cache.get("global_pros_cons_signals") | |
| global_signal_input: dict[str, list[dict]] | None = ( | |
| {"general": raw_global_signals} | |
| if isinstance(raw_global_signals, list) | |
| else None | |
| ) | |
| normalized_cache: dict[str, Any] = { | |
| "highlights": _normalize_highlight_entries(cache.get("highlights")), | |
| "topic_highlights": _normalize_topic_highlight_map(cache.get("topic_highlights")), | |
| "global_pros_cons_signals": normalize_contextual_global_pros_cons_signals( | |
| global_signal_input | |
| )["general"], | |
| } | |
| return normalized_cache | |
| def _extract_surviving_topic_names(raw_topics: Any) -> set[str]: | |
| surviving_topics: set[str] = set() | |
| for topic in raw_topics or []: | |
| if not isinstance(topic, dict): | |
| continue | |
| topic_name = topic.get("topic") | |
| if isinstance(topic_name, str): | |
| surviving_topics.add(topic_name) | |
| return surviving_topics | |
| def _merge_highlight_evidence( | |
| existing_entries: Any, | |
| new_entries: Any, | |
| *, | |
| working_limit: int | None = None, | |
| ) -> list[dict[str, Any]]: | |
| merged: dict[str, dict[str, Any]] = {} | |
| def merge_entry(entry: dict[str, Any]) -> None: | |
| key = _normalize_text_identity(entry["phrase"]) | |
| if not key: | |
| return | |
| current = merged.get(key) | |
| if current is None: | |
| merged[key] = copy.deepcopy(entry) | |
| return | |
| previous_mentions = current["mention_count"] | |
| current["mention_count"] += entry["mention_count"] | |
| current["score"] = round( | |
| ( | |
| current["score"] * previous_mentions | |
| + entry["score"] * entry["mention_count"] | |
| ) / max(current["mention_count"], 1), | |
| 3, | |
| ) | |
| current["sentiment"] = _classify_sentiment_value(current["score"]) | |
| current["ngram_size"] = min(current["ngram_size"], entry["ngram_size"]) | |
| if ( | |
| entry["mention_count"], | |
| abs(entry["score"]), | |
| -len(entry["phrase"]), | |
| entry["phrase"].lower(), | |
| ) > ( | |
| previous_mentions, | |
| abs(current["score"]), | |
| -len(current["phrase"]), | |
| current["phrase"].lower(), | |
| ): | |
| current["phrase"] = entry["phrase"] | |
| for entry in _normalize_highlight_entries(existing_entries): | |
| merge_entry(entry) | |
| for entry in _normalize_highlight_entries(new_entries): | |
| merge_entry(entry) | |
| limit = working_limit if working_limit is not None else _highlight_working_limit() | |
| return sorted( | |
| merged.values(), | |
| key=lambda item: ( | |
| -item["mention_count"], | |
| -abs(item["score"]), | |
| item["ngram_size"], | |
| len(item["phrase"]), | |
| item["phrase"].lower(), | |
| ), | |
| )[:limit] | |
| def _merge_topic_highlight_entries( | |
| existing_entries: Any, | |
| new_entries: Any, | |
| *, | |
| working_limit: int | None = None, | |
| ) -> list[dict[str, Any]]: | |
| merged: dict[str, dict[str, Any]] = {} | |
| def merge_entry(entry: dict[str, Any]) -> None: | |
| key = _normalize_text_identity(entry["text"]) | |
| if not key: | |
| return | |
| current = merged.get(key) | |
| if current is None: | |
| merged[key] = copy.deepcopy(entry) | |
| return | |
| previous_mentions = current["mention_count"] | |
| current["mention_count"] += entry["mention_count"] | |
| current["score"] = round( | |
| ( | |
| current["score"] * previous_mentions | |
| + entry["score"] * entry["mention_count"] | |
| ) / max(current["mention_count"], 1), | |
| 3, | |
| ) | |
| current["sentiment"] = _classify_sentiment_value(current["score"]) | |
| incoming_source = _extract_topic_highlight_source(entry) | |
| should_replace_text = ( | |
| entry["mention_count"], | |
| abs(entry["score"]), | |
| -len(entry["text"]), | |
| entry["text"].lower(), | |
| ) > ( | |
| previous_mentions, | |
| abs(current["score"]), | |
| -len(current["text"]), | |
| current["text"].lower(), | |
| ) | |
| if should_replace_text: | |
| current["text"] = entry["text"] | |
| _replace_topic_highlight_source(current, incoming_source) | |
| elif not _has_topic_highlight_source(current) and any( | |
| value is not None for value in incoming_source.values() | |
| ): | |
| _replace_topic_highlight_source(current, incoming_source) | |
| for entry in _normalize_topic_highlight_entries(existing_entries): | |
| merge_entry(entry) | |
| for entry in _normalize_topic_highlight_entries(new_entries): | |
| merge_entry(entry) | |
| limit = working_limit if working_limit is not None else _topic_highlight_working_limit() | |
| return sorted( | |
| merged.values(), | |
| key=lambda item: ( | |
| -item["mention_count"], | |
| -abs(item["score"]), | |
| len(item["text"]), | |
| item["text"].lower(), | |
| ), | |
| )[:limit] | |
| def _merge_topic_highlight_maps( | |
| existing_map: Any, | |
| new_map: Any, | |
| ) -> dict[str, list[dict[str, Any]]]: | |
| normalized_existing = _normalize_topic_highlight_map(existing_map) | |
| normalized_new = _normalize_topic_highlight_map(new_map) | |
| merged: dict[str, list[dict[str, Any]]] = {} | |
| for topic in sorted(set(normalized_existing) | set(normalized_new)): | |
| merged_entries = _merge_topic_highlight_entries( | |
| normalized_existing.get(topic), | |
| normalized_new.get(topic), | |
| ) | |
| if merged_entries: | |
| merged[topic] = merged_entries | |
| return merged | |
| def merge_evidence_cache( | |
| existing_cache: Any, | |
| new_cache: Any, | |
| ) -> dict[str, Any]: | |
| normalized_existing = normalize_evidence_cache(existing_cache) | |
| normalized_new = normalize_evidence_cache(new_cache) | |
| return { | |
| "highlights": _merge_highlight_evidence( | |
| normalized_existing.get("highlights"), | |
| normalized_new.get("highlights"), | |
| ), | |
| "topic_highlights": _merge_topic_highlight_maps( | |
| normalized_existing.get("topic_highlights"), | |
| normalized_new.get("topic_highlights"), | |
| ), | |
| "global_pros_cons_signals": sorted( | |
| merge_global_pros_cons_signal_sources( | |
| normalized_existing.get("global_pros_cons_signals"), | |
| normalized_new.get("global_pros_cons_signals"), | |
| ), | |
| key=lambda item: ( | |
| -int(item.get("mention_count") or 0), | |
| -float(item.get("quality") or 0.0), | |
| -abs(float(item.get("score") or 0.0)), | |
| int(item.get("first_position") or 0), | |
| len(str(item.get("phrase") or "")), | |
| str(item.get("phrase") or "").lower(), | |
| ), | |
| )[:_global_signal_working_limit()], | |
| } | |
| def build_evidence_cache_payload( | |
| *, | |
| highlights: Any = None, | |
| topic_highlights: Any = None, | |
| global_pros_cons_signals: Any = None, | |
| ) -> dict[str, Any]: | |
| return normalize_evidence_cache( | |
| { | |
| "highlights": highlights, | |
| "topic_highlights": topic_highlights, | |
| "global_pros_cons_signals": global_pros_cons_signals, | |
| } | |
| ) | |
| def build_topic_highlights_payload_from_cache( | |
| topic_highlights: Any, | |
| surviving_topics: set[str], | |
| ) -> list[dict[str, Any]]: | |
| normalized = _normalize_topic_highlight_map(topic_highlights) | |
| return [ | |
| { | |
| "topic": topic, | |
| "highlights": copy.deepcopy(highlights), | |
| } | |
| for topic, highlights in normalized.items() | |
| if topic in surviving_topics | |
| ] | |
| def build_patch_epoch_id(patch_timestamp: int) -> str: | |
| """Return a deterministic patch-epoch identity for one major-update boundary.""" | |
| return f"patch-{int(patch_timestamp)}" | |
| def _serialize_model_list(items: Any) -> list[dict[str, Any]]: | |
| """Serialize a list of Pydantic models or dict payloads into JSON-safe dicts.""" | |
| if not isinstance(items, list): | |
| return [] | |
| serialized: list[dict[str, Any]] = [] | |
| for item in items: | |
| if isinstance(item, dict): | |
| serialized.append(copy.deepcopy(item)) | |
| continue | |
| model_dump = getattr(item, "model_dump", None) | |
| if callable(model_dump): | |
| serialized.append(model_dump(mode="json")) | |
| return serialized | |
| def _normalize_signal_sources(signals: Any) -> list[dict[str, Any]]: | |
| """Keep only persisted pros/cons signal-source dict entries.""" | |
| if not isinstance(signals, list): | |
| return [] | |
| return [copy.deepcopy(entry) for entry in signals if isinstance(entry, dict)] | |
| def build_patch_epoch_aggregate( | |
| *, | |
| topics: list[TopicSentiment] | None = None, | |
| review_count: int = 0, | |
| total_sentences: int | None = None, | |
| skipped_sentences: int | None = None, | |
| topic_bearing_review_count: int | None = None, | |
| highlights: Any = None, | |
| topic_highlights: Any = None, | |
| global_pros_cons_signals: Any = None, | |
| ) -> dict[str, Any]: | |
| """Build the canonical persisted aggregate for one patch epoch.""" | |
| aggregate = { | |
| "topics": [topic.model_dump(mode="json") for topic in (topics or [])], | |
| "review_count": max(0, int(review_count or 0)), | |
| "highlights": _serialize_model_list(highlights), | |
| "topic_highlights": _serialize_model_list(topic_highlights), | |
| "global_pros_cons_signals": _normalize_signal_sources(global_pros_cons_signals), | |
| } | |
| optional_metrics = { | |
| "total_sentences": total_sentences, | |
| "skipped_sentences": skipped_sentences, | |
| "topic_bearing_review_count": topic_bearing_review_count, | |
| } | |
| for key, value in optional_metrics.items(): | |
| normalized = _coerce_optional_context_metric(value) | |
| if normalized is not None: | |
| aggregate[key] = normalized | |
| return aggregate | |
| def normalize_patch_epoch_aggregate(aggregate: Any) -> dict[str, Any]: | |
| """Normalize one persisted patch-epoch aggregate into the canonical shape.""" | |
| raw_topics = aggregate.get("topics") if isinstance(aggregate, dict) else None | |
| return build_patch_epoch_aggregate( | |
| topics=coerce_topic_sentiments(raw_topics), | |
| review_count=aggregate.get("review_count", 0) if isinstance(aggregate, dict) else 0, | |
| total_sentences=aggregate.get("total_sentences") if isinstance(aggregate, dict) else None, | |
| skipped_sentences=aggregate.get("skipped_sentences") if isinstance(aggregate, dict) else None, | |
| topic_bearing_review_count=( | |
| aggregate.get("topic_bearing_review_count") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| highlights=aggregate.get("highlights") if isinstance(aggregate, dict) else None, | |
| topic_highlights=aggregate.get("topic_highlights") if isinstance(aggregate, dict) else None, | |
| global_pros_cons_signals=( | |
| aggregate.get("global_pros_cons_signals") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| ) | |
| def normalize_patch_epochs(persisted_corpus: dict[str, Any] | None) -> list[dict[str, Any]]: | |
| """Normalize persisted patch epochs while preserving append-only ordering semantics.""" | |
| raw_epochs = persisted_corpus.get("patch_epochs") if isinstance(persisted_corpus, dict) else None | |
| if not isinstance(raw_epochs, list): | |
| return [] | |
| epochs: list[dict[str, Any]] = [] | |
| for entry in raw_epochs: | |
| if not isinstance(entry, dict): | |
| continue | |
| patch_timestamp = entry.get("patch_timestamp") | |
| if not isinstance(patch_timestamp, int) or patch_timestamp <= 0: | |
| continue | |
| raw_id = entry.get("id") | |
| epoch_id = raw_id if isinstance(raw_id, str) and raw_id else build_patch_epoch_id(patch_timestamp) | |
| epochs.append( | |
| { | |
| "id": epoch_id, | |
| "patch_timestamp": patch_timestamp, | |
| "aggregate": normalize_patch_epoch_aggregate(entry.get("aggregate")), | |
| } | |
| ) | |
| epochs.sort(key=lambda epoch: epoch["patch_timestamp"]) | |
| return epochs | |
| def _get_general_evidence_cache( | |
| persisted_corpus: dict[str, Any] | None, | |
| results: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| raw_cache = None | |
| if isinstance(persisted_corpus, dict): | |
| raw_evidence_caches = persisted_corpus.get("evidence_caches") | |
| if isinstance(raw_evidence_caches, dict): | |
| raw_cache = raw_evidence_caches.get("general") | |
| if isinstance(raw_cache, dict): | |
| return normalize_evidence_cache(raw_cache) | |
| return build_evidence_cache_payload( | |
| highlights=results.get("general_highlights"), | |
| topic_highlights=results.get("topic_highlights"), | |
| global_pros_cons_signals=( | |
| results.get("global_pros_cons_signals", {}).get("general") | |
| if isinstance(results.get("global_pros_cons_signals"), dict) | |
| else None | |
| ), | |
| ) | |
| def _get_patch_epoch_evidence_cache( | |
| persisted_corpus: dict[str, Any] | None, | |
| epoch: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| epoch_id = epoch.get("id") | |
| raw_cache = None | |
| if isinstance(persisted_corpus, dict): | |
| raw_evidence_caches = persisted_corpus.get("evidence_caches") | |
| if isinstance(raw_evidence_caches, dict): | |
| raw_patch_caches = raw_evidence_caches.get("patch_epochs") | |
| if isinstance(raw_patch_caches, dict) and isinstance(epoch_id, str): | |
| raw_cache = raw_patch_caches.get(epoch_id) | |
| if isinstance(raw_cache, dict): | |
| return normalize_evidence_cache(raw_cache) | |
| aggregate = epoch.get("aggregate") if isinstance(epoch, dict) else None | |
| return build_evidence_cache_payload( | |
| highlights=aggregate.get("highlights") if isinstance(aggregate, dict) else None, | |
| topic_highlights=aggregate.get("topic_highlights") if isinstance(aggregate, dict) else None, | |
| global_pros_cons_signals=( | |
| aggregate.get("global_pros_cons_signals") | |
| if isinstance(aggregate, dict) | |
| else None | |
| ), | |
| ) | |
| def reconstruct_recent_evidence_cache( | |
| persisted_corpus: dict[str, Any] | None, | |
| *, | |
| reference_at: datetime | None = None, | |
| ) -> dict[str, Any]: | |
| raw_recent = persisted_corpus.get("recent") if isinstance(persisted_corpus, dict) else None | |
| raw_daily_buckets = raw_recent.get("daily_utc_buckets") if isinstance(raw_recent, dict) else None | |
| if not isinstance(raw_daily_buckets, list): | |
| return build_empty_evidence_cache() | |
| raw_evidence_caches = persisted_corpus.get("evidence_caches") if isinstance(persisted_corpus, dict) else None | |
| raw_recent_caches = raw_evidence_caches.get("recent") if isinstance(raw_evidence_caches, dict) else None | |
| raw_bucket_caches = ( | |
| raw_recent_caches.get("daily_utc_buckets") | |
| if isinstance(raw_recent_caches, dict) | |
| else None | |
| ) | |
| recent_bucket_caches = raw_bucket_caches if isinstance(raw_bucket_caches, dict) else {} | |
| from app.services.recent_window import _get_recent_bucket_bounds | |
| earliest_bucket_start, reference_day_start = _get_recent_bucket_bounds(reference_at) | |
| merged_cache = build_empty_evidence_cache() | |
| for bucket in raw_daily_buckets: | |
| if not isinstance(bucket, dict): | |
| continue | |
| bucket_start = bucket.get("bucket_start") | |
| if ( | |
| not isinstance(bucket_start, int) | |
| or bucket_start < earliest_bucket_start | |
| or bucket_start > reference_day_start | |
| ): | |
| continue | |
| bucket_cache = recent_bucket_caches.get(str(bucket_start)) | |
| if not isinstance(bucket_cache, dict): | |
| continue | |
| merged_cache = merge_evidence_cache(merged_cache, bucket_cache) | |
| return merged_cache | |
| def apply_evidence_cache_results( | |
| results: dict[str, Any], | |
| persisted_corpus: dict[str, Any] | None, | |
| *, | |
| reference_at: datetime | None = None, | |
| ) -> dict[str, Any]: | |
| projected = dict(results) | |
| surviving_general_topics = _extract_surviving_topic_names( | |
| projected.get("general_topics") | |
| ) | |
| general_cache = _get_general_evidence_cache(persisted_corpus, projected) | |
| projected["general_highlights"] = copy.deepcopy(general_cache["highlights"][:_highlight_display_limit()]) | |
| projected["topic_highlights"] = build_topic_highlights_payload_from_cache( | |
| general_cache["topic_highlights"], | |
| surviving_general_topics, | |
| ) | |
| recent_reviews_count = projected.get("recent_reviews_count", 0) or 0 | |
| recent_topics = projected.get("recent_topics") or [] | |
| surviving_recent_topics = _extract_surviving_topic_names(recent_topics) | |
| recent_cache = reconstruct_recent_evidence_cache( | |
| persisted_corpus, | |
| reference_at=reference_at, | |
| ) | |
| projected["recent_highlights"] = ( | |
| copy.deepcopy(recent_cache["highlights"][:_highlight_display_limit()]) | |
| if recent_reviews_count > 0 and recent_cache["highlights"] | |
| else None | |
| ) | |
| projected["recent_topic_highlights"] = ( | |
| build_topic_highlights_payload_from_cache( | |
| recent_cache["topic_highlights"], | |
| surviving_recent_topics, | |
| ) | |
| if recent_reviews_count > 0 | |
| else [] | |
| ) | |
| global_signals = normalize_contextual_global_pros_cons_signals( | |
| projected.get("global_pros_cons_signals") | |
| ) | |
| global_signals["general"] = copy.deepcopy(general_cache["global_pros_cons_signals"]) | |
| global_signals["recent"] = ( | |
| copy.deepcopy(recent_cache["global_pros_cons_signals"]) | |
| if recent_reviews_count > 0 | |
| else [] | |
| ) | |
| patch_epochs = normalize_patch_epochs(persisted_corpus) | |
| if patch_epochs: | |
| current_epoch = patch_epochs[-1] | |
| current_patch_reviews_count = projected.get("current_patch_reviews_count", 0) or 0 | |
| surviving_current_patch_topics = _extract_surviving_topic_names( | |
| projected.get("current_patch_topics") | |
| ) | |
| patch_cache = _get_patch_epoch_evidence_cache(persisted_corpus, current_epoch) | |
| projected["current_patch_highlights"] = ( | |
| copy.deepcopy(patch_cache["highlights"][:_highlight_display_limit()]) | |
| if current_patch_reviews_count > 0 and patch_cache["highlights"] | |
| else None | |
| ) | |
| projected["current_patch_topic_highlights"] = ( | |
| build_topic_highlights_payload_from_cache( | |
| patch_cache["topic_highlights"], | |
| surviving_current_patch_topics, | |
| ) | |
| if current_patch_reviews_count > 0 | |
| else [] | |
| ) | |
| global_signals["current_patch"] = ( | |
| copy.deepcopy(patch_cache["global_pros_cons_signals"]) | |
| if current_patch_reviews_count > 0 | |
| else [] | |
| ) | |
| projected["global_pros_cons_signals"] = global_signals | |
| return projected | |
| def merge_patch_epochs( | |
| persisted_corpus: dict[str, Any] | None, | |
| *, | |
| patch_timestamp: int | None, | |
| current_patch_topics: list[TopicSentiment] | None, | |
| current_patch_reviews_count: int, | |
| current_patch_total_sentences: int | None = None, | |
| current_patch_skipped_sentences: int | None = None, | |
| current_patch_topic_bearing_review_count: int | None = None, | |
| current_patch_highlights: Any = None, | |
| current_patch_topic_highlights: Any = None, | |
| current_patch_global_pros_cons_signals: Any = None, | |
| ) -> list[dict[str, Any]]: | |
| """Update the active patch epoch or append a new one on patch-boundary rollover.""" | |
| epochs = normalize_patch_epochs(persisted_corpus) | |
| if patch_timestamp is None: | |
| return epochs | |
| epoch_id = build_patch_epoch_id(patch_timestamp) | |
| new_epoch = { | |
| "id": epoch_id, | |
| "patch_timestamp": patch_timestamp, | |
| "aggregate": build_patch_epoch_aggregate( | |
| topics=current_patch_topics, | |
| review_count=current_patch_reviews_count, | |
| total_sentences=current_patch_total_sentences, | |
| skipped_sentences=current_patch_skipped_sentences, | |
| topic_bearing_review_count=current_patch_topic_bearing_review_count, | |
| highlights=current_patch_highlights, | |
| topic_highlights=current_patch_topic_highlights, | |
| global_pros_cons_signals=current_patch_global_pros_cons_signals, | |
| ), | |
| } | |
| for index, epoch in enumerate(epochs): | |
| if epoch["id"] == epoch_id or epoch["patch_timestamp"] == patch_timestamp: | |
| epochs[index] = new_epoch | |
| return epochs | |
| epochs.append(new_epoch) | |
| epochs.sort(key=lambda epoch: epoch["patch_timestamp"]) | |
| return epochs | |
| def apply_patch_epoch_results( | |
| results: dict[str, Any], | |
| persisted_corpus: dict[str, Any] | None, | |
| ) -> dict[str, Any]: | |
| """Project canonical patch epochs into the v1 current/last patch payload fields.""" | |
| patch_epochs = normalize_patch_epochs(persisted_corpus) | |
| if not patch_epochs: | |
| projected = dict(results) | |
| _apply_optional_context_metrics( | |
| projected, | |
| prefix="current_patch_", | |
| source=None, | |
| ) | |
| return projected | |
| projected = dict(results) | |
| current_epoch = patch_epochs[-1] | |
| previous_epoch = patch_epochs[-2] if len(patch_epochs) > 1 else None | |
| current_aggregate = current_epoch["aggregate"] | |
| current_topics = current_aggregate.get("topics", []) | |
| current_review_count = current_aggregate.get("review_count", 0) | |
| projected["current_patch_topics"] = current_topics or None | |
| projected["current_patch_reviews_count"] = current_review_count | |
| projected["current_patch_timestamp"] = current_epoch["patch_timestamp"] | |
| projected["current_patch_highlights"] = current_aggregate.get("highlights") or None | |
| projected["current_patch_topic_highlights"] = current_aggregate.get("topic_highlights", []) | |
| projected["current_patch_scope"] = ( | |
| "sentimentstream_analyzed_reviews_since_major_update" | |
| if current_review_count > 0 or bool(current_topics) | |
| else None | |
| ) | |
| _apply_optional_context_metrics( | |
| projected, | |
| prefix="current_patch_", | |
| source=current_aggregate, | |
| ) | |
| raw_signals = projected.get("global_pros_cons_signals") | |
| if isinstance(raw_signals, dict): | |
| projected["global_pros_cons_signals"] = { | |
| "general": _normalize_signal_sources(raw_signals.get("general")), | |
| "recent": _normalize_signal_sources(raw_signals.get("recent")), | |
| "current_patch": current_aggregate.get("global_pros_cons_signals", []), | |
| } | |
| if previous_epoch is None: | |
| projected["last_patch_topics"] = None | |
| projected["last_patch_reviews_count"] = 0 | |
| else: | |
| previous_aggregate = previous_epoch["aggregate"] | |
| projected["last_patch_topics"] = previous_aggregate.get("topics") or None | |
| projected["last_patch_reviews_count"] = previous_aggregate.get("review_count", 0) | |
| return projected | |
| def apply_general_aggregate_results( | |
| results: dict[str, Any], | |
| persisted_corpus: dict[str, Any] | None, | |
| ) -> dict[str, Any]: | |
| """Project canonical general aggregate fields into the read payload.""" | |
| projected = dict(results) | |
| raw_general = persisted_corpus.get("general") if isinstance(persisted_corpus, dict) else None | |
| raw_aggregate = raw_general.get("aggregate") if isinstance(raw_general, dict) else None | |
| has_canonical_general = isinstance(raw_aggregate, dict) and ( | |
| max(0, int(raw_aggregate.get("review_count", 0) or 0)) > 0 | |
| or max(0, int(raw_aggregate.get("skipped_count", 0) or 0)) > 0 | |
| or bool(raw_aggregate.get("topics")) | |
| or any(raw_aggregate.get(key) is not None for key in _OPTIONAL_CONTEXT_METRIC_KEYS) | |
| ) | |
| if not has_canonical_general: | |
| _apply_optional_context_metrics(projected, source=None) | |
| return projected | |
| general_aggregate = normalize_general_aggregate( | |
| raw_aggregate, | |
| fallback_topics=coerce_topic_sentiments(results.get("general_topics")), | |
| fallback_review_count=results.get("analyzed_reviews", 0), | |
| fallback_skipped_count=results.get("skipped_count", 0), | |
| ) | |
| projected["general_topics"] = general_aggregate.get("topics", []) | |
| projected["analyzed_reviews"] = general_aggregate.get("review_count", 0) | |
| projected["skipped_count"] = general_aggregate.get("skipped_count", 0) | |
| _apply_optional_context_metrics(projected, source=general_aggregate) | |
| return projected | |
| def apply_recent_bucket_results( | |
| results: dict[str, Any], | |
| persisted_corpus: dict[str, Any] | None, | |
| *, | |
| reference_at: datetime | None = None, | |
| ) -> dict[str, Any]: | |
| """Project canonical daily recent buckets into the v1 recent payload fields.""" | |
| projected = dict(results) | |
| raw_recent = persisted_corpus.get("recent") if isinstance(persisted_corpus, dict) else None | |
| raw_daily_buckets = raw_recent.get("daily_utc_buckets") if isinstance(raw_recent, dict) else None | |
| from app.services.recent_window import reconstruct_recent_from_daily_utc_buckets | |
| ( | |
| recent_topics, | |
| recent_reviews_count, | |
| recent_total_sentences, | |
| recent_skipped_sentences, | |
| recent_topic_bearing_review_count, | |
| ) = reconstruct_recent_from_daily_utc_buckets( | |
| raw_daily_buckets, | |
| reference_at=reference_at, | |
| ) | |
| projected["recent_topics"] = ( | |
| [topic.model_dump(mode="json") for topic in recent_topics] | |
| if recent_reviews_count > 0 | |
| else None | |
| ) | |
| projected["recent_reviews_count"] = recent_reviews_count | |
| projected["recent_scope"] = ( | |
| "sentimentstream_analyzed_reviews_last_30_days" | |
| if recent_reviews_count > 0 | |
| else None | |
| ) | |
| _apply_optional_context_metrics( | |
| projected, | |
| prefix="recent_", | |
| source={ | |
| "total_sentences": recent_total_sentences, | |
| "skipped_sentences": recent_skipped_sentences, | |
| "topic_bearing_review_count": recent_topic_bearing_review_count, | |
| }, | |
| ) | |
| return projected | |
| def build_persisted_corpus_snapshot( | |
| stale_analysis: "AnalysisDocumentReadModel | None", | |
| *, | |
| general_aggregate: dict[str, Any], | |
| general_highlights: Any = None, | |
| general_topic_highlights: Any = None, | |
| general_global_pros_cons_signals: Any = None, | |
| recent_daily_utc_buckets: list[dict[str, Any]] | None = None, | |
| recent_bucket_evidence_caches: dict[int | str, dict[str, Any]] | None = None, | |
| recent_evidence_mode: Literal["merge", "replace"] = "merge", | |
| ingested_items: list[Any], | |
| patch_timestamp: int | None, | |
| current_patch_topics: list[TopicSentiment] | None, | |
| current_patch_reviews_count: int, | |
| current_patch_total_sentences: int | None = None, | |
| current_patch_skipped_sentences: int | None = None, | |
| current_patch_topic_bearing_review_count: int | None = None, | |
| current_patch_highlights: Any = None, | |
| current_patch_topic_highlights: Any = None, | |
| current_patch_global_pros_cons_signals: Any = None, | |
| current_patch_evidence_mode: Literal["merge", "replace"] = "replace", | |
| ) -> dict[str, Any]: | |
| """Update the canonical persisted-corpus envelope for one analysis write.""" | |
| persisted_corpus = ( | |
| copy.deepcopy(stale_analysis.persisted_corpus) | |
| if stale_analysis and stale_analysis.persisted_corpus is not None | |
| else build_empty_persisted_corpus_envelope() | |
| ) | |
| stale_persisted_corpus = ( | |
| stale_analysis.persisted_corpus | |
| if stale_analysis and stale_analysis.persisted_corpus is not None | |
| else None | |
| ) | |
| persisted_corpus["general"] = {"aggregate": general_aggregate} | |
| if recent_daily_utc_buckets is not None: | |
| persisted_corpus["recent"] = { | |
| "daily_utc_buckets": copy.deepcopy(recent_daily_utc_buckets) | |
| } | |
| evidence_caches = persisted_corpus.get("evidence_caches") | |
| if not isinstance(evidence_caches, dict): | |
| evidence_caches = { | |
| "general": build_empty_evidence_cache(), | |
| "recent": {"daily_utc_buckets": {}}, | |
| "patch_epochs": {}, | |
| } | |
| persisted_corpus["evidence_caches"] = evidence_caches | |
| general_cache = _get_general_evidence_cache( | |
| persisted_corpus, | |
| stale_analysis.results if stale_analysis is not None else {}, | |
| ) | |
| incoming_general_cache = build_evidence_cache_payload( | |
| highlights=general_highlights, | |
| topic_highlights=general_topic_highlights, | |
| global_pros_cons_signals=general_global_pros_cons_signals, | |
| ) | |
| evidence_caches["general"] = merge_evidence_cache(general_cache, incoming_general_cache) | |
| recent_cache_root = evidence_caches.get("recent") | |
| if not isinstance(recent_cache_root, dict): | |
| recent_cache_root = {"daily_utc_buckets": {}} | |
| existing_recent_bucket_caches = recent_cache_root.get("daily_utc_buckets") | |
| if not isinstance(existing_recent_bucket_caches, dict): | |
| existing_recent_bucket_caches = {} | |
| allowed_recent_bucket_keys = { | |
| str(entry.get("bucket_start")) | |
| for entry in recent_daily_utc_buckets or [] | |
| if isinstance(entry, dict) and isinstance(entry.get("bucket_start"), int) | |
| } | |
| merged_recent_bucket_caches: dict[str, dict[str, Any]] = ( | |
| { | |
| key: normalize_evidence_cache(value) | |
| for key, value in existing_recent_bucket_caches.items() | |
| if key in allowed_recent_bucket_keys and isinstance(value, dict) | |
| } | |
| if recent_evidence_mode == "merge" | |
| else {} | |
| ) | |
| for bucket_key, bucket_cache in (recent_bucket_evidence_caches or {}).items(): | |
| normalized_key = str(bucket_key) | |
| if normalized_key not in allowed_recent_bucket_keys: | |
| continue | |
| normalized_bucket_cache = normalize_evidence_cache(bucket_cache) | |
| if recent_evidence_mode == "merge": | |
| merged_recent_bucket_caches[normalized_key] = merge_evidence_cache( | |
| merged_recent_bucket_caches.get(normalized_key), | |
| normalized_bucket_cache, | |
| ) | |
| else: | |
| merged_recent_bucket_caches[normalized_key] = normalized_bucket_cache | |
| evidence_caches["recent"] = { | |
| "daily_utc_buckets": merged_recent_bucket_caches, | |
| } | |
| persisted_corpus["patch_epochs"] = merge_patch_epochs( | |
| persisted_corpus, | |
| patch_timestamp=patch_timestamp, | |
| current_patch_topics=current_patch_topics, | |
| current_patch_reviews_count=current_patch_reviews_count, | |
| current_patch_total_sentences=current_patch_total_sentences, | |
| current_patch_skipped_sentences=current_patch_skipped_sentences, | |
| current_patch_topic_bearing_review_count=current_patch_topic_bearing_review_count, | |
| current_patch_highlights=current_patch_highlights, | |
| current_patch_topic_highlights=current_patch_topic_highlights, | |
| current_patch_global_pros_cons_signals=current_patch_global_pros_cons_signals, | |
| ) | |
| raw_patch_epoch_caches = evidence_caches.get("patch_epochs") | |
| if not isinstance(raw_patch_epoch_caches, dict): | |
| raw_patch_epoch_caches = {} | |
| existing_patch_epoch_ids = { | |
| epoch.get("id") | |
| for epoch in normalize_patch_epochs(persisted_corpus) | |
| if isinstance(epoch.get("id"), str) | |
| } | |
| patch_epoch_caches = { | |
| epoch_id: normalize_evidence_cache(cache) | |
| for epoch_id, cache in raw_patch_epoch_caches.items() | |
| if epoch_id in existing_patch_epoch_ids and isinstance(cache, dict) | |
| } | |
| if patch_timestamp is not None: | |
| patch_epoch_id = build_patch_epoch_id(patch_timestamp) | |
| incoming_patch_cache = build_evidence_cache_payload( | |
| highlights=current_patch_highlights, | |
| topic_highlights=current_patch_topic_highlights, | |
| global_pros_cons_signals=current_patch_global_pros_cons_signals, | |
| ) | |
| existing_patch_cache = None | |
| if current_patch_evidence_mode == "merge": | |
| matching_epoch = next( | |
| ( | |
| epoch | |
| for epoch in normalize_patch_epochs(stale_persisted_corpus) | |
| if epoch.get("id") == patch_epoch_id | |
| ), | |
| None, | |
| ) if stale_persisted_corpus is not None else None | |
| existing_patch_cache = ( | |
| patch_epoch_caches.get(patch_epoch_id) | |
| or (_get_patch_epoch_evidence_cache(stale_persisted_corpus, matching_epoch) if matching_epoch else None) | |
| ) | |
| patch_epoch_caches[patch_epoch_id] = ( | |
| merge_evidence_cache(existing_patch_cache, incoming_patch_cache) | |
| if current_patch_evidence_mode == "merge" | |
| else normalize_evidence_cache(incoming_patch_cache) | |
| ) | |
| evidence_caches["patch_epochs"] = patch_epoch_caches | |
| persisted_corpus["evidence_caches"] = evidence_caches | |
| ingest_state = persisted_corpus.get("ingest_state") | |
| if not isinstance(ingest_state, dict): | |
| ingest_state = { | |
| "cursor": None, | |
| "latest_ingested_review_ts": None, | |
| "latest_ingested_review_ids_at_ts": [], | |
| } | |
| persisted_corpus["ingest_state"] = ingest_state | |
| if ingested_items: | |
| existing_latest_ts = ingest_state.get("latest_ingested_review_ts") | |
| existing_boundary_ids = ingest_state.get("latest_ingested_review_ids_at_ts") | |
| existing_boundary_ids = ( | |
| [review_id for review_id in existing_boundary_ids if isinstance(review_id, str)] | |
| if isinstance(existing_boundary_ids, list) | |
| else [] | |
| ) | |
| latest_ts = max(item.timestamp_created for item in ingested_items) | |
| latest_ids: list[str] = [] | |
| for item in ingested_items: | |
| if item.timestamp_created != latest_ts: | |
| continue | |
| review_id = item.recommendation_id | |
| if review_id in latest_ids: | |
| continue | |
| latest_ids.append(review_id) | |
| if existing_latest_ts == latest_ts: | |
| merged_boundary_ids = list(existing_boundary_ids) | |
| for review_id in latest_ids: | |
| if review_id not in merged_boundary_ids: | |
| merged_boundary_ids.append(review_id) | |
| latest_ids = merged_boundary_ids | |
| ingest_state["latest_ingested_review_ts"] = latest_ts | |
| ingest_state["latest_ingested_review_ids_at_ts"] = latest_ids | |
| return persisted_corpus | |
| def build_topic_highlights_payload( | |
| topic_highlights: dict[str, list[dict[str, Any]]], | |
| surviving_topics: set[str], | |
| ) -> list[TopicHighlights]: | |
| """Serialize grouped topic evidence for topics that survived aggregate filtering.""" | |
| return [ | |
| TopicHighlights( | |
| topic=topic, | |
| highlights=[TopicEvidenceSnippet(**highlight) for highlight in highlights], | |
| ) | |
| for topic, highlights in topic_highlights.items() | |
| if topic in surviving_topics | |
| ] | |
| _LEGACY_FIELD_MAP = { | |
| "topics": "general_topics", | |
| "historical_topics": "general_topics", | |
| "post_update_topics": "current_patch_topics", | |
| "post_update_reviews_count": "current_patch_reviews_count", | |
| "post_update_highlights": "current_patch_highlights", | |
| "previous_update_topics": "last_patch_topics", | |
| "previous_update_reviews_count": "last_patch_reviews_count", | |
| "last_update_timestamp": "current_patch_timestamp", | |
| } | |
| class AnalysisDocumentReadModel: | |
| """Normalized read model for persisted analysis documents.""" | |
| source_format: Literal["legacy", "persisted_corpus"] | |
| results: dict[str, Any] | |
| persisted_corpus: dict[str, Any] | None | |
| analyzed_review_ids: list[str] | |
| latest_review_timestamp: int | |
| cached_at: Any = None | |
| analyzed_at: Any = None | |
| def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]: | |
| """Map legacy persisted result fields to the current schema.""" | |
| normalized: dict[str, Any] = {} | |
| for key, value in results.items(): | |
| new_key = _LEGACY_FIELD_MAP.get(key, key) | |
| if key == "is_incremental": | |
| continue | |
| if new_key in { | |
| "topic_highlights", | |
| "recent_topic_highlights", | |
| "current_patch_topic_highlights", | |
| } and isinstance(value, list): | |
| value = [TopicHighlights(**entry).model_dump() for entry in value] | |
| if new_key not in normalized: | |
| normalized[new_key] = value | |
| raw_purchase_risk = normalized.get("purchase_risk") | |
| if isinstance(raw_purchase_risk, dict): | |
| localization = raw_purchase_risk.get("localization") | |
| if isinstance(localization, dict) and localization.get("state") == "minor_issue": | |
| normalized["purchase_risk"] = { | |
| **raw_purchase_risk, | |
| "localization": { | |
| **localization, | |
| "state": "minor_issues", | |
| }, | |
| } | |
| if not normalized.get("general_scope"): | |
| normalized["general_scope"] = "sentimentstream_analyzed_reviews" | |
| if ( | |
| not normalized.get("recent_scope") | |
| and ( | |
| normalized.get("recent_topics") is not None | |
| or normalized.get("recent_reviews_count", 0) > 0 | |
| ) | |
| ): | |
| normalized["recent_scope"] = "sentimentstream_analyzed_reviews_last_30_days" | |
| return normalized | |
| def read_analysis_document( | |
| document: dict[str, Any] | None, | |
| *, | |
| recent_reference_at: datetime | None = None, | |
| ) -> AnalysisDocumentReadModel: | |
| """Return one normalized application contract for legacy and envelope docs. | |
| Legacy documents stay readable through normalized `results`, but the adapter | |
| must not synthesize persisted-corpus sections that were never stored. | |
| """ | |
| raw_document = document if isinstance(document, dict) else {} | |
| raw_results = raw_document.get("results") | |
| results = normalize_legacy_results(raw_results if isinstance(raw_results, dict) else {}) | |
| persisted_corpus = ( | |
| raw_document.get("persisted_corpus") | |
| if has_persisted_corpus_envelope(raw_document) | |
| else None | |
| ) | |
| if persisted_corpus is not None: | |
| results = apply_general_aggregate_results(results, persisted_corpus) | |
| results = apply_recent_bucket_results( | |
| results, | |
| persisted_corpus, | |
| reference_at=recent_reference_at, | |
| ) | |
| results = apply_patch_epoch_results(results, persisted_corpus) | |
| results = apply_evidence_cache_results( | |
| results, | |
| persisted_corpus, | |
| reference_at=recent_reference_at, | |
| ) | |
| return AnalysisDocumentReadModel( | |
| source_format="persisted_corpus" if persisted_corpus is not None else "legacy", | |
| results=results, | |
| persisted_corpus=persisted_corpus, | |
| analyzed_review_ids=list(raw_document.get("analyzed_review_ids", [])), | |
| latest_review_timestamp=raw_document.get("latest_review_timestamp", 0), | |
| cached_at=raw_document.get("cached_at"), | |
| analyzed_at=raw_document.get("analyzed_at"), | |
| ) | |
| def serialize_datetime(value: Any) -> str | Any: | |
| """Serialize datetimes in SSE payloads and persisted compatibility helpers.""" | |
| if isinstance(value, datetime): | |
| return value.isoformat() | |
| return value | |
| def coerce_utc_datetime(value: Any) -> datetime | None: | |
| """Coerce persisted datetime values into timezone-aware UTC datetimes.""" | |
| if isinstance(value, datetime): | |
| return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc) | |
| if isinstance(value, str): | |
| parsed = datetime.fromisoformat(value) | |
| return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) | |
| return None | |
| def datetime_from_timestamp(timestamp: int | None) -> datetime | None: | |
| """Convert a unix timestamp into UTC datetime.""" | |
| if timestamp is None: | |
| return None | |
| return datetime.fromtimestamp(timestamp, tz=timezone.utc) | |