sentimentstream-worker / backend /app /services /analysis_utils.py
GitHub Action
deploy: worker release from GitHub
d0097df
"""Shared analysis helpers used by both live and worker paths."""
import copy
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Literal
from app.core.config import settings
from app.db.mongodb import (
build_empty_persisted_corpus_envelope,
has_persisted_corpus_envelope,
)
from app.models.schemas import (
PredictionType,
SentimentType,
TopicSentiment,
TopicEvidenceSnippet,
TopicHighlights,
UserCountPrediction,
)
from app.services.global_pros_cons import (
merge_global_pros_cons_signal_sources,
normalize_contextual_global_pros_cons_signals,
)
def calculate_prediction(topics: list[TopicSentiment]) -> UserCountPrediction:
"""Compute the player-count trend prediction from aggregated topics."""
topic_map = {t.topic: t for t in topics}
# Prefer the renamed public topic, but keep a legacy fallback while
# older cached analyses still carry the previous topic name.
stickiness = topic_map.get("Stickiness") or topic_map.get("Retention")
if stickiness and stickiness.mention_count > 5:
if stickiness.score > settings.prediction_retention_threshold_pos:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=min(0.95, 0.5 + (stickiness.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_HIGH",
)
if stickiness.score < settings.prediction_retention_threshold_neg:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=min(0.95, 0.5 + (stickiness.mention_count / 100)),
reasoning="PREDICTION_REASONING_RETENTION_LOW",
)
bugs = topic_map.get("Bugs")
performance = topic_map.get("Performance")
tech_score = 0.0
tech_count = 0
if bugs:
tech_score += bugs.score
tech_count += 1
if performance:
tech_score += performance.score
tech_count += 1
if tech_count > 0 and (tech_score / tech_count) < -0.3:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.75,
reasoning="PREDICTION_REASONING_TECH_ISSUES",
)
gameplay = topic_map.get("Gameplay")
fun = topic_map.get("Fun")
gameplay_score = 0.0
gameplay_count = 0
if gameplay:
gameplay_score += gameplay.score
gameplay_count += 1
if fun:
gameplay_score += fun.score
gameplay_count += 1
if gameplay_count > 0:
average_gameplay = gameplay_score / gameplay_count
if average_gameplay > 0.4:
return UserCountPrediction(
trend=PredictionType.INCREASING,
confidence=0.8,
reasoning="PREDICTION_REASONING_GAMEPLAY_HIGH",
)
if average_gameplay < -0.2:
return UserCountPrediction(
trend=PredictionType.DECREASING,
confidence=0.6,
reasoning="PREDICTION_REASONING_GAMEPLAY_LOW",
)
return UserCountPrediction(
trend=PredictionType.STABLE,
confidence=0.5,
reasoning="PREDICTION_REASONING_STABLE",
)
def aggregate_topics(
existing: list[TopicSentiment],
new_batch: list[TopicSentiment],
) -> list[TopicSentiment]:
"""Merge topic aggregates using weighted mention counts."""
topic_data: dict[str, dict[str, Any]] = {}
def better_example(
current: tuple[str, float] | None,
new: tuple[str, float] | None,
) -> tuple[str, float] | None:
if new is None:
return current
if current is None:
return new
return new if abs(new[1]) > abs(current[1]) else current
for topic in existing:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
for topic in new_batch:
if topic.topic not in topic_data:
topic_data[topic.topic] = {"scores": [], "count": 0, "example": None}
topic_data[topic.topic]["scores"].append(topic.score * topic.mention_count)
topic_data[topic.topic]["count"] += topic.mention_count
new_example = (
(topic.example, topic.example_score)
if topic.example and topic.example_score is not None
else None
)
topic_data[topic.topic]["example"] = better_example(
topic_data[topic.topic]["example"],
new_example,
)
results: list[TopicSentiment] = []
for topic_name, data in topic_data.items():
count = data["count"]
if count == 0:
continue
average_score = sum(data["scores"]) / count
normalized_score = max(-1.0, min(1.0, average_score))
if normalized_score > settings.sentiment_positive_threshold:
sentiment = SentimentType.POSITIVE
elif normalized_score < settings.sentiment_negative_threshold:
sentiment = SentimentType.NEGATIVE
else:
sentiment = SentimentType.NEUTRAL
best_example = None
example_score = None
example_data = data["example"]
if example_data:
example_text, candidate_score = example_data
if sentiment == SentimentType.NEUTRAL or (
sentiment == SentimentType.POSITIVE and candidate_score > 0
) or (
sentiment == SentimentType.NEGATIVE and candidate_score < 0
):
best_example = example_text
example_score = candidate_score
results.append(
TopicSentiment(
topic=topic_name,
sentiment=sentiment,
score=round(normalized_score, 3),
mention_count=count,
example=best_example,
example_score=example_score,
)
)
results.sort(key=lambda item: item.mention_count, reverse=True)
return results
def coerce_topic_sentiments(raw_topics: Any) -> list[TopicSentiment]:
"""Deserialize persisted topic aggregates into validated TopicSentiment models."""
if not isinstance(raw_topics, list):
return []
topics: list[TopicSentiment] = []
for entry in raw_topics:
if isinstance(entry, TopicSentiment):
topics.append(entry)
continue
if not isinstance(entry, dict):
continue
try:
topics.append(TopicSentiment(**entry))
except Exception:
continue
return topics
_OPTIONAL_CONTEXT_METRIC_KEYS = (
"total_sentences",
"skipped_sentences",
"topic_bearing_review_count",
)
def _coerce_optional_context_metric(value: Any) -> int | None:
if value is None:
return None
try:
return max(0, int(value))
except (TypeError, ValueError):
return None
def _merge_optional_context_metric(
existing_value: Any,
delta_value: int | None,
*,
allow_seed: bool,
) -> int | None:
normalized_delta = _coerce_optional_context_metric(delta_value)
if normalized_delta is None:
return None if not allow_seed else _coerce_optional_context_metric(existing_value)
normalized_existing = _coerce_optional_context_metric(existing_value)
if normalized_existing is None:
return normalized_delta if allow_seed else None
return normalized_existing + normalized_delta
def _apply_optional_context_metrics(
target: dict[str, Any],
*,
prefix: str = "",
source: dict[str, Any] | None,
) -> None:
for key in _OPTIONAL_CONTEXT_METRIC_KEYS:
payload_key = f"{prefix}{key}"
value = (
_coerce_optional_context_metric(source.get(key))
if isinstance(source, dict)
else None
)
if value is None:
target.pop(payload_key, None)
else:
target[payload_key] = value
def build_general_aggregate(
*,
topics: list[TopicSentiment] | None = None,
review_count: int = 0,
skipped_count: int = 0,
total_sentences: int | None = None,
skipped_sentences: int | None = None,
topic_bearing_review_count: int | None = None,
) -> dict[str, Any]:
"""Build the canonical mergeable persisted aggregate for general."""
aggregate = {
"review_count": max(0, int(review_count or 0)),
"skipped_count": max(0, int(skipped_count or 0)),
"topics": [
topic.model_dump(mode="json")
for topic in (topics or [])
],
}
optional_metrics = {
"total_sentences": total_sentences,
"skipped_sentences": skipped_sentences,
"topic_bearing_review_count": topic_bearing_review_count,
}
for key, value in optional_metrics.items():
normalized = _coerce_optional_context_metric(value)
if normalized is not None:
aggregate[key] = normalized
return aggregate
def normalize_general_aggregate(
aggregate: dict[str, Any] | None,
*,
fallback_topics: list[TopicSentiment] | None = None,
fallback_review_count: int = 0,
fallback_skipped_count: int = 0,
) -> dict[str, Any]:
"""Normalize persisted general aggregate shape with optional legacy fallbacks."""
raw_topics = aggregate.get("topics") if isinstance(aggregate, dict) else None
topics = (
coerce_topic_sentiments(raw_topics)
if isinstance(raw_topics, list)
else list(fallback_topics or [])
)
review_count = (
aggregate.get("review_count", fallback_review_count)
if isinstance(aggregate, dict)
else fallback_review_count
)
skipped_count = (
aggregate.get("skipped_count", fallback_skipped_count)
if isinstance(aggregate, dict)
else fallback_skipped_count
)
return build_general_aggregate(
topics=topics,
review_count=review_count,
skipped_count=skipped_count,
total_sentences=(
aggregate.get("total_sentences")
if isinstance(aggregate, dict)
else None
),
skipped_sentences=(
aggregate.get("skipped_sentences")
if isinstance(aggregate, dict)
else None
),
topic_bearing_review_count=(
aggregate.get("topic_bearing_review_count")
if isinstance(aggregate, dict)
else None
),
)
def get_persisted_general_aggregate(
analysis: "AnalysisDocumentReadModel | None",
) -> dict[str, Any]:
"""Return canonical general aggregate, seeding from legacy results when needed."""
if analysis is None:
return build_general_aggregate()
fallback_topics = coerce_topic_sentiments(analysis.results.get("general_topics"))
fallback_review_count = analysis.results.get("analyzed_reviews", 0)
fallback_skipped_count = analysis.results.get("skipped_count", 0)
raw_aggregate = None
if analysis.persisted_corpus is not None:
raw_general = analysis.persisted_corpus.get("general")
if isinstance(raw_general, dict):
candidate = raw_general.get("aggregate")
if isinstance(candidate, dict):
raw_aggregate = candidate
return normalize_general_aggregate(
raw_aggregate,
fallback_topics=fallback_topics,
fallback_review_count=fallback_review_count,
fallback_skipped_count=fallback_skipped_count,
)
def merge_general_aggregate(
existing_aggregate: dict[str, Any] | None,
*,
delta_topics: list[TopicSentiment],
delta_review_count: int,
delta_skipped_count: int,
delta_total_sentences: int | None = None,
delta_skipped_sentences: int | None = None,
delta_topic_bearing_review_count: int | None = None,
) -> dict[str, Any]:
"""Append new general-review contributions onto the canonical persisted aggregate."""
normalized = normalize_general_aggregate(existing_aggregate)
existing_topics = coerce_topic_sentiments(normalized.get("topics"))
merged_topics = aggregate_topics(existing_topics, delta_topics)
allow_seed_metrics = not existing_aggregate or normalized.get("review_count", 0) == 0
return build_general_aggregate(
topics=merged_topics,
review_count=normalized.get("review_count", 0) + max(0, delta_review_count),
skipped_count=normalized.get("skipped_count", 0) + max(0, delta_skipped_count),
total_sentences=_merge_optional_context_metric(
normalized.get("total_sentences"),
delta_total_sentences,
allow_seed=allow_seed_metrics,
),
skipped_sentences=_merge_optional_context_metric(
normalized.get("skipped_sentences"),
delta_skipped_sentences,
allow_seed=allow_seed_metrics,
),
topic_bearing_review_count=_merge_optional_context_metric(
normalized.get("topic_bearing_review_count"),
delta_topic_bearing_review_count,
allow_seed=allow_seed_metrics,
),
)
def scale_topics(topics: list[TopicSentiment], factor: float) -> list[TopicSentiment]:
"""Scale mention counts for the approximate recent sliding window."""
return [
topic.model_copy(update={"mention_count": max(1, int(topic.mention_count * factor))})
for topic in topics
]
def filter_topics_by_min_mentions(
topics: list[TopicSentiment],
min_mentions: int | None = None,
) -> list[TopicSentiment]:
"""Filter topics below the minimum mention threshold.
Preserves existing sort order. Only filters — does not modify score or sentiment.
Applied at the final aggregate level, never at the per-review level.
"""
threshold = min_mentions if min_mentions is not None else settings.topic_min_mentions
return [t for t in topics if t.mention_count >= threshold]
def compute_preferred_context(patch_timestamp: int | None) -> str:
"""Choose the default user-facing context tab.
Returns 'current_patch' whenever a reliable major patch exists; otherwise
returns 'recent' as the safe current-state fallback.
"""
return "current_patch" if patch_timestamp is not None else "recent"
_TEXT_NORMALIZE_RE = re.compile(r"[^\w\u4e00-\u9fff]+")
_TOPIC_HIGHLIGHT_SOURCE_KEYS = (
"source_steamid",
"source_recommendation_id",
"source_timestamp_created",
"source_playtime_at_review",
"source_voted_up",
"source_language",
"source_steam_purchase",
"source_received_for_free",
"source_written_during_early_access",
)
def _highlight_display_limit() -> int:
return max(1, int(settings.highlights_top_n_general))
def _highlight_working_limit() -> int:
display_limit = _highlight_display_limit()
return max(display_limit + 1, display_limit * 2)
def _topic_highlight_display_limit() -> int:
return max(1, int(settings.highlights_top_n_per_topic))
def _topic_highlight_working_limit() -> int:
display_limit = _topic_highlight_display_limit()
return max(display_limit + 1, display_limit * 2)
def _global_signal_working_limit() -> int:
return 24
def build_empty_evidence_cache() -> dict[str, Any]:
return {
"highlights": [],
"topic_highlights": {},
"global_pros_cons_signals": [],
}
def _normalize_text_identity(value: str) -> str:
normalized = _TEXT_NORMALIZE_RE.sub(" ", str(value).lower()).strip()
return re.sub(r"\s+", " ", normalized)
def _classify_sentiment_value(score: float) -> str:
if score > settings.sentiment_positive_threshold:
return SentimentType.POSITIVE.value
if score < settings.sentiment_negative_threshold:
return SentimentType.NEGATIVE.value
return SentimentType.NEUTRAL.value
def _normalize_highlight_entries(entries: Any) -> list[dict[str, Any]]:
if not isinstance(entries, list):
return []
normalized: list[dict[str, Any]] = []
for entry in entries:
if not isinstance(entry, dict):
model_dump = getattr(entry, "model_dump", None)
if not callable(model_dump):
continue
entry = model_dump(mode="json")
phrase = entry.get("phrase") or entry.get("text")
mention_count = int(entry.get("mention_count") or 0)
if not phrase or mention_count <= 0:
continue
normalized.append(
{
"phrase": str(phrase),
"mention_count": mention_count,
"sentiment": str(entry.get("sentiment") or _classify_sentiment_value(float(entry.get("score") or 0.0))),
"score": float(entry.get("score") or 0.0),
"ngram_size": int(entry.get("ngram_size") or max(1, len(str(phrase).split()))),
}
)
return normalized
def _normalize_topic_highlight_entries(entries: Any) -> list[dict[str, Any]]:
if not isinstance(entries, list):
return []
normalized: list[dict[str, Any]] = []
for entry in entries:
if not isinstance(entry, dict):
model_dump = getattr(entry, "model_dump", None)
if not callable(model_dump):
continue
entry = model_dump(mode="json")
text = entry.get("text") or entry.get("phrase")
mention_count = int(entry.get("mention_count") or 0)
if not text or mention_count <= 0:
continue
normalized_entry = {
"text": str(text),
"mention_count": mention_count,
"sentiment": str(entry.get("sentiment") or _classify_sentiment_value(float(entry.get("score") or 0.0))),
"score": float(entry.get("score") or 0.0),
}
for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS:
if key in entry and entry.get(key) is not None:
normalized_entry[key] = copy.deepcopy(entry.get(key))
normalized.append(normalized_entry)
return normalized
def _extract_topic_highlight_source(entry: dict[str, Any]) -> dict[str, Any]:
return {
key: copy.deepcopy(entry.get(key))
for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS
if key in entry and entry.get(key) is not None
}
def _has_topic_highlight_source(entry: dict[str, Any]) -> bool:
return any(entry.get(key) is not None for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS)
def _replace_topic_highlight_source(target: dict[str, Any], source: dict[str, Any]) -> None:
for key in _TOPIC_HIGHLIGHT_SOURCE_KEYS:
target.pop(key, None)
target.update(source)
def _normalize_topic_highlight_map(raw_topic_highlights: Any) -> dict[str, list[dict[str, Any]]]:
if isinstance(raw_topic_highlights, dict):
normalized: dict[str, list[dict[str, Any]]] = {}
for topic, entries in raw_topic_highlights.items():
if not isinstance(topic, str):
continue
topic_entries = _normalize_topic_highlight_entries(entries)
if topic_entries:
normalized[topic] = topic_entries
return normalized
if not isinstance(raw_topic_highlights, list):
return {}
normalized_from_list: dict[str, list[dict[str, Any]]] = {}
for entry in raw_topic_highlights:
if not isinstance(entry, dict):
model_dump = getattr(entry, "model_dump", None)
if not callable(model_dump):
continue
entry = model_dump(mode="json")
topic = entry.get("topic")
if not isinstance(topic, str) or not topic:
continue
topic_entries = _normalize_topic_highlight_entries(entry.get("highlights"))
if topic_entries:
normalized_from_list[topic] = topic_entries
return normalized_from_list
def normalize_evidence_cache(cache: Any) -> dict[str, Any]:
if not isinstance(cache, dict):
return build_empty_evidence_cache()
raw_global_signals = cache.get("global_pros_cons_signals")
global_signal_input: dict[str, list[dict]] | None = (
{"general": raw_global_signals}
if isinstance(raw_global_signals, list)
else None
)
normalized_cache: dict[str, Any] = {
"highlights": _normalize_highlight_entries(cache.get("highlights")),
"topic_highlights": _normalize_topic_highlight_map(cache.get("topic_highlights")),
"global_pros_cons_signals": normalize_contextual_global_pros_cons_signals(
global_signal_input
)["general"],
}
return normalized_cache
def _extract_surviving_topic_names(raw_topics: Any) -> set[str]:
surviving_topics: set[str] = set()
for topic in raw_topics or []:
if not isinstance(topic, dict):
continue
topic_name = topic.get("topic")
if isinstance(topic_name, str):
surviving_topics.add(topic_name)
return surviving_topics
def _merge_highlight_evidence(
existing_entries: Any,
new_entries: Any,
*,
working_limit: int | None = None,
) -> list[dict[str, Any]]:
merged: dict[str, dict[str, Any]] = {}
def merge_entry(entry: dict[str, Any]) -> None:
key = _normalize_text_identity(entry["phrase"])
if not key:
return
current = merged.get(key)
if current is None:
merged[key] = copy.deepcopy(entry)
return
previous_mentions = current["mention_count"]
current["mention_count"] += entry["mention_count"]
current["score"] = round(
(
current["score"] * previous_mentions
+ entry["score"] * entry["mention_count"]
) / max(current["mention_count"], 1),
3,
)
current["sentiment"] = _classify_sentiment_value(current["score"])
current["ngram_size"] = min(current["ngram_size"], entry["ngram_size"])
if (
entry["mention_count"],
abs(entry["score"]),
-len(entry["phrase"]),
entry["phrase"].lower(),
) > (
previous_mentions,
abs(current["score"]),
-len(current["phrase"]),
current["phrase"].lower(),
):
current["phrase"] = entry["phrase"]
for entry in _normalize_highlight_entries(existing_entries):
merge_entry(entry)
for entry in _normalize_highlight_entries(new_entries):
merge_entry(entry)
limit = working_limit if working_limit is not None else _highlight_working_limit()
return sorted(
merged.values(),
key=lambda item: (
-item["mention_count"],
-abs(item["score"]),
item["ngram_size"],
len(item["phrase"]),
item["phrase"].lower(),
),
)[:limit]
def _merge_topic_highlight_entries(
existing_entries: Any,
new_entries: Any,
*,
working_limit: int | None = None,
) -> list[dict[str, Any]]:
merged: dict[str, dict[str, Any]] = {}
def merge_entry(entry: dict[str, Any]) -> None:
key = _normalize_text_identity(entry["text"])
if not key:
return
current = merged.get(key)
if current is None:
merged[key] = copy.deepcopy(entry)
return
previous_mentions = current["mention_count"]
current["mention_count"] += entry["mention_count"]
current["score"] = round(
(
current["score"] * previous_mentions
+ entry["score"] * entry["mention_count"]
) / max(current["mention_count"], 1),
3,
)
current["sentiment"] = _classify_sentiment_value(current["score"])
incoming_source = _extract_topic_highlight_source(entry)
should_replace_text = (
entry["mention_count"],
abs(entry["score"]),
-len(entry["text"]),
entry["text"].lower(),
) > (
previous_mentions,
abs(current["score"]),
-len(current["text"]),
current["text"].lower(),
)
if should_replace_text:
current["text"] = entry["text"]
_replace_topic_highlight_source(current, incoming_source)
elif not _has_topic_highlight_source(current) and any(
value is not None for value in incoming_source.values()
):
_replace_topic_highlight_source(current, incoming_source)
for entry in _normalize_topic_highlight_entries(existing_entries):
merge_entry(entry)
for entry in _normalize_topic_highlight_entries(new_entries):
merge_entry(entry)
limit = working_limit if working_limit is not None else _topic_highlight_working_limit()
return sorted(
merged.values(),
key=lambda item: (
-item["mention_count"],
-abs(item["score"]),
len(item["text"]),
item["text"].lower(),
),
)[:limit]
def _merge_topic_highlight_maps(
existing_map: Any,
new_map: Any,
) -> dict[str, list[dict[str, Any]]]:
normalized_existing = _normalize_topic_highlight_map(existing_map)
normalized_new = _normalize_topic_highlight_map(new_map)
merged: dict[str, list[dict[str, Any]]] = {}
for topic in sorted(set(normalized_existing) | set(normalized_new)):
merged_entries = _merge_topic_highlight_entries(
normalized_existing.get(topic),
normalized_new.get(topic),
)
if merged_entries:
merged[topic] = merged_entries
return merged
def merge_evidence_cache(
existing_cache: Any,
new_cache: Any,
) -> dict[str, Any]:
normalized_existing = normalize_evidence_cache(existing_cache)
normalized_new = normalize_evidence_cache(new_cache)
return {
"highlights": _merge_highlight_evidence(
normalized_existing.get("highlights"),
normalized_new.get("highlights"),
),
"topic_highlights": _merge_topic_highlight_maps(
normalized_existing.get("topic_highlights"),
normalized_new.get("topic_highlights"),
),
"global_pros_cons_signals": sorted(
merge_global_pros_cons_signal_sources(
normalized_existing.get("global_pros_cons_signals"),
normalized_new.get("global_pros_cons_signals"),
),
key=lambda item: (
-int(item.get("mention_count") or 0),
-float(item.get("quality") or 0.0),
-abs(float(item.get("score") or 0.0)),
int(item.get("first_position") or 0),
len(str(item.get("phrase") or "")),
str(item.get("phrase") or "").lower(),
),
)[:_global_signal_working_limit()],
}
def build_evidence_cache_payload(
*,
highlights: Any = None,
topic_highlights: Any = None,
global_pros_cons_signals: Any = None,
) -> dict[str, Any]:
return normalize_evidence_cache(
{
"highlights": highlights,
"topic_highlights": topic_highlights,
"global_pros_cons_signals": global_pros_cons_signals,
}
)
def build_topic_highlights_payload_from_cache(
topic_highlights: Any,
surviving_topics: set[str],
) -> list[dict[str, Any]]:
normalized = _normalize_topic_highlight_map(topic_highlights)
return [
{
"topic": topic,
"highlights": copy.deepcopy(highlights),
}
for topic, highlights in normalized.items()
if topic in surviving_topics
]
def build_patch_epoch_id(patch_timestamp: int) -> str:
"""Return a deterministic patch-epoch identity for one major-update boundary."""
return f"patch-{int(patch_timestamp)}"
def _serialize_model_list(items: Any) -> list[dict[str, Any]]:
"""Serialize a list of Pydantic models or dict payloads into JSON-safe dicts."""
if not isinstance(items, list):
return []
serialized: list[dict[str, Any]] = []
for item in items:
if isinstance(item, dict):
serialized.append(copy.deepcopy(item))
continue
model_dump = getattr(item, "model_dump", None)
if callable(model_dump):
serialized.append(model_dump(mode="json"))
return serialized
def _normalize_signal_sources(signals: Any) -> list[dict[str, Any]]:
"""Keep only persisted pros/cons signal-source dict entries."""
if not isinstance(signals, list):
return []
return [copy.deepcopy(entry) for entry in signals if isinstance(entry, dict)]
def build_patch_epoch_aggregate(
*,
topics: list[TopicSentiment] | None = None,
review_count: int = 0,
total_sentences: int | None = None,
skipped_sentences: int | None = None,
topic_bearing_review_count: int | None = None,
highlights: Any = None,
topic_highlights: Any = None,
global_pros_cons_signals: Any = None,
) -> dict[str, Any]:
"""Build the canonical persisted aggregate for one patch epoch."""
aggregate = {
"topics": [topic.model_dump(mode="json") for topic in (topics or [])],
"review_count": max(0, int(review_count or 0)),
"highlights": _serialize_model_list(highlights),
"topic_highlights": _serialize_model_list(topic_highlights),
"global_pros_cons_signals": _normalize_signal_sources(global_pros_cons_signals),
}
optional_metrics = {
"total_sentences": total_sentences,
"skipped_sentences": skipped_sentences,
"topic_bearing_review_count": topic_bearing_review_count,
}
for key, value in optional_metrics.items():
normalized = _coerce_optional_context_metric(value)
if normalized is not None:
aggregate[key] = normalized
return aggregate
def normalize_patch_epoch_aggregate(aggregate: Any) -> dict[str, Any]:
"""Normalize one persisted patch-epoch aggregate into the canonical shape."""
raw_topics = aggregate.get("topics") if isinstance(aggregate, dict) else None
return build_patch_epoch_aggregate(
topics=coerce_topic_sentiments(raw_topics),
review_count=aggregate.get("review_count", 0) if isinstance(aggregate, dict) else 0,
total_sentences=aggregate.get("total_sentences") if isinstance(aggregate, dict) else None,
skipped_sentences=aggregate.get("skipped_sentences") if isinstance(aggregate, dict) else None,
topic_bearing_review_count=(
aggregate.get("topic_bearing_review_count")
if isinstance(aggregate, dict)
else None
),
highlights=aggregate.get("highlights") if isinstance(aggregate, dict) else None,
topic_highlights=aggregate.get("topic_highlights") if isinstance(aggregate, dict) else None,
global_pros_cons_signals=(
aggregate.get("global_pros_cons_signals")
if isinstance(aggregate, dict)
else None
),
)
def normalize_patch_epochs(persisted_corpus: dict[str, Any] | None) -> list[dict[str, Any]]:
"""Normalize persisted patch epochs while preserving append-only ordering semantics."""
raw_epochs = persisted_corpus.get("patch_epochs") if isinstance(persisted_corpus, dict) else None
if not isinstance(raw_epochs, list):
return []
epochs: list[dict[str, Any]] = []
for entry in raw_epochs:
if not isinstance(entry, dict):
continue
patch_timestamp = entry.get("patch_timestamp")
if not isinstance(patch_timestamp, int) or patch_timestamp <= 0:
continue
raw_id = entry.get("id")
epoch_id = raw_id if isinstance(raw_id, str) and raw_id else build_patch_epoch_id(patch_timestamp)
epochs.append(
{
"id": epoch_id,
"patch_timestamp": patch_timestamp,
"aggregate": normalize_patch_epoch_aggregate(entry.get("aggregate")),
}
)
epochs.sort(key=lambda epoch: epoch["patch_timestamp"])
return epochs
def _get_general_evidence_cache(
persisted_corpus: dict[str, Any] | None,
results: dict[str, Any],
) -> dict[str, Any]:
raw_cache = None
if isinstance(persisted_corpus, dict):
raw_evidence_caches = persisted_corpus.get("evidence_caches")
if isinstance(raw_evidence_caches, dict):
raw_cache = raw_evidence_caches.get("general")
if isinstance(raw_cache, dict):
return normalize_evidence_cache(raw_cache)
return build_evidence_cache_payload(
highlights=results.get("general_highlights"),
topic_highlights=results.get("topic_highlights"),
global_pros_cons_signals=(
results.get("global_pros_cons_signals", {}).get("general")
if isinstance(results.get("global_pros_cons_signals"), dict)
else None
),
)
def _get_patch_epoch_evidence_cache(
persisted_corpus: dict[str, Any] | None,
epoch: dict[str, Any],
) -> dict[str, Any]:
epoch_id = epoch.get("id")
raw_cache = None
if isinstance(persisted_corpus, dict):
raw_evidence_caches = persisted_corpus.get("evidence_caches")
if isinstance(raw_evidence_caches, dict):
raw_patch_caches = raw_evidence_caches.get("patch_epochs")
if isinstance(raw_patch_caches, dict) and isinstance(epoch_id, str):
raw_cache = raw_patch_caches.get(epoch_id)
if isinstance(raw_cache, dict):
return normalize_evidence_cache(raw_cache)
aggregate = epoch.get("aggregate") if isinstance(epoch, dict) else None
return build_evidence_cache_payload(
highlights=aggregate.get("highlights") if isinstance(aggregate, dict) else None,
topic_highlights=aggregate.get("topic_highlights") if isinstance(aggregate, dict) else None,
global_pros_cons_signals=(
aggregate.get("global_pros_cons_signals")
if isinstance(aggregate, dict)
else None
),
)
def reconstruct_recent_evidence_cache(
persisted_corpus: dict[str, Any] | None,
*,
reference_at: datetime | None = None,
) -> dict[str, Any]:
raw_recent = persisted_corpus.get("recent") if isinstance(persisted_corpus, dict) else None
raw_daily_buckets = raw_recent.get("daily_utc_buckets") if isinstance(raw_recent, dict) else None
if not isinstance(raw_daily_buckets, list):
return build_empty_evidence_cache()
raw_evidence_caches = persisted_corpus.get("evidence_caches") if isinstance(persisted_corpus, dict) else None
raw_recent_caches = raw_evidence_caches.get("recent") if isinstance(raw_evidence_caches, dict) else None
raw_bucket_caches = (
raw_recent_caches.get("daily_utc_buckets")
if isinstance(raw_recent_caches, dict)
else None
)
recent_bucket_caches = raw_bucket_caches if isinstance(raw_bucket_caches, dict) else {}
from app.services.recent_window import _get_recent_bucket_bounds
earliest_bucket_start, reference_day_start = _get_recent_bucket_bounds(reference_at)
merged_cache = build_empty_evidence_cache()
for bucket in raw_daily_buckets:
if not isinstance(bucket, dict):
continue
bucket_start = bucket.get("bucket_start")
if (
not isinstance(bucket_start, int)
or bucket_start < earliest_bucket_start
or bucket_start > reference_day_start
):
continue
bucket_cache = recent_bucket_caches.get(str(bucket_start))
if not isinstance(bucket_cache, dict):
continue
merged_cache = merge_evidence_cache(merged_cache, bucket_cache)
return merged_cache
def apply_evidence_cache_results(
results: dict[str, Any],
persisted_corpus: dict[str, Any] | None,
*,
reference_at: datetime | None = None,
) -> dict[str, Any]:
projected = dict(results)
surviving_general_topics = _extract_surviving_topic_names(
projected.get("general_topics")
)
general_cache = _get_general_evidence_cache(persisted_corpus, projected)
projected["general_highlights"] = copy.deepcopy(general_cache["highlights"][:_highlight_display_limit()])
projected["topic_highlights"] = build_topic_highlights_payload_from_cache(
general_cache["topic_highlights"],
surviving_general_topics,
)
recent_reviews_count = projected.get("recent_reviews_count", 0) or 0
recent_topics = projected.get("recent_topics") or []
surviving_recent_topics = _extract_surviving_topic_names(recent_topics)
recent_cache = reconstruct_recent_evidence_cache(
persisted_corpus,
reference_at=reference_at,
)
projected["recent_highlights"] = (
copy.deepcopy(recent_cache["highlights"][:_highlight_display_limit()])
if recent_reviews_count > 0 and recent_cache["highlights"]
else None
)
projected["recent_topic_highlights"] = (
build_topic_highlights_payload_from_cache(
recent_cache["topic_highlights"],
surviving_recent_topics,
)
if recent_reviews_count > 0
else []
)
global_signals = normalize_contextual_global_pros_cons_signals(
projected.get("global_pros_cons_signals")
)
global_signals["general"] = copy.deepcopy(general_cache["global_pros_cons_signals"])
global_signals["recent"] = (
copy.deepcopy(recent_cache["global_pros_cons_signals"])
if recent_reviews_count > 0
else []
)
patch_epochs = normalize_patch_epochs(persisted_corpus)
if patch_epochs:
current_epoch = patch_epochs[-1]
current_patch_reviews_count = projected.get("current_patch_reviews_count", 0) or 0
surviving_current_patch_topics = _extract_surviving_topic_names(
projected.get("current_patch_topics")
)
patch_cache = _get_patch_epoch_evidence_cache(persisted_corpus, current_epoch)
projected["current_patch_highlights"] = (
copy.deepcopy(patch_cache["highlights"][:_highlight_display_limit()])
if current_patch_reviews_count > 0 and patch_cache["highlights"]
else None
)
projected["current_patch_topic_highlights"] = (
build_topic_highlights_payload_from_cache(
patch_cache["topic_highlights"],
surviving_current_patch_topics,
)
if current_patch_reviews_count > 0
else []
)
global_signals["current_patch"] = (
copy.deepcopy(patch_cache["global_pros_cons_signals"])
if current_patch_reviews_count > 0
else []
)
projected["global_pros_cons_signals"] = global_signals
return projected
def merge_patch_epochs(
persisted_corpus: dict[str, Any] | None,
*,
patch_timestamp: int | None,
current_patch_topics: list[TopicSentiment] | None,
current_patch_reviews_count: int,
current_patch_total_sentences: int | None = None,
current_patch_skipped_sentences: int | None = None,
current_patch_topic_bearing_review_count: int | None = None,
current_patch_highlights: Any = None,
current_patch_topic_highlights: Any = None,
current_patch_global_pros_cons_signals: Any = None,
) -> list[dict[str, Any]]:
"""Update the active patch epoch or append a new one on patch-boundary rollover."""
epochs = normalize_patch_epochs(persisted_corpus)
if patch_timestamp is None:
return epochs
epoch_id = build_patch_epoch_id(patch_timestamp)
new_epoch = {
"id": epoch_id,
"patch_timestamp": patch_timestamp,
"aggregate": build_patch_epoch_aggregate(
topics=current_patch_topics,
review_count=current_patch_reviews_count,
total_sentences=current_patch_total_sentences,
skipped_sentences=current_patch_skipped_sentences,
topic_bearing_review_count=current_patch_topic_bearing_review_count,
highlights=current_patch_highlights,
topic_highlights=current_patch_topic_highlights,
global_pros_cons_signals=current_patch_global_pros_cons_signals,
),
}
for index, epoch in enumerate(epochs):
if epoch["id"] == epoch_id or epoch["patch_timestamp"] == patch_timestamp:
epochs[index] = new_epoch
return epochs
epochs.append(new_epoch)
epochs.sort(key=lambda epoch: epoch["patch_timestamp"])
return epochs
def apply_patch_epoch_results(
results: dict[str, Any],
persisted_corpus: dict[str, Any] | None,
) -> dict[str, Any]:
"""Project canonical patch epochs into the v1 current/last patch payload fields."""
patch_epochs = normalize_patch_epochs(persisted_corpus)
if not patch_epochs:
projected = dict(results)
_apply_optional_context_metrics(
projected,
prefix="current_patch_",
source=None,
)
return projected
projected = dict(results)
current_epoch = patch_epochs[-1]
previous_epoch = patch_epochs[-2] if len(patch_epochs) > 1 else None
current_aggregate = current_epoch["aggregate"]
current_topics = current_aggregate.get("topics", [])
current_review_count = current_aggregate.get("review_count", 0)
projected["current_patch_topics"] = current_topics or None
projected["current_patch_reviews_count"] = current_review_count
projected["current_patch_timestamp"] = current_epoch["patch_timestamp"]
projected["current_patch_highlights"] = current_aggregate.get("highlights") or None
projected["current_patch_topic_highlights"] = current_aggregate.get("topic_highlights", [])
projected["current_patch_scope"] = (
"sentimentstream_analyzed_reviews_since_major_update"
if current_review_count > 0 or bool(current_topics)
else None
)
_apply_optional_context_metrics(
projected,
prefix="current_patch_",
source=current_aggregate,
)
raw_signals = projected.get("global_pros_cons_signals")
if isinstance(raw_signals, dict):
projected["global_pros_cons_signals"] = {
"general": _normalize_signal_sources(raw_signals.get("general")),
"recent": _normalize_signal_sources(raw_signals.get("recent")),
"current_patch": current_aggregate.get("global_pros_cons_signals", []),
}
if previous_epoch is None:
projected["last_patch_topics"] = None
projected["last_patch_reviews_count"] = 0
else:
previous_aggregate = previous_epoch["aggregate"]
projected["last_patch_topics"] = previous_aggregate.get("topics") or None
projected["last_patch_reviews_count"] = previous_aggregate.get("review_count", 0)
return projected
def apply_general_aggregate_results(
results: dict[str, Any],
persisted_corpus: dict[str, Any] | None,
) -> dict[str, Any]:
"""Project canonical general aggregate fields into the read payload."""
projected = dict(results)
raw_general = persisted_corpus.get("general") if isinstance(persisted_corpus, dict) else None
raw_aggregate = raw_general.get("aggregate") if isinstance(raw_general, dict) else None
has_canonical_general = isinstance(raw_aggregate, dict) and (
max(0, int(raw_aggregate.get("review_count", 0) or 0)) > 0
or max(0, int(raw_aggregate.get("skipped_count", 0) or 0)) > 0
or bool(raw_aggregate.get("topics"))
or any(raw_aggregate.get(key) is not None for key in _OPTIONAL_CONTEXT_METRIC_KEYS)
)
if not has_canonical_general:
_apply_optional_context_metrics(projected, source=None)
return projected
general_aggregate = normalize_general_aggregate(
raw_aggregate,
fallback_topics=coerce_topic_sentiments(results.get("general_topics")),
fallback_review_count=results.get("analyzed_reviews", 0),
fallback_skipped_count=results.get("skipped_count", 0),
)
projected["general_topics"] = general_aggregate.get("topics", [])
projected["analyzed_reviews"] = general_aggregate.get("review_count", 0)
projected["skipped_count"] = general_aggregate.get("skipped_count", 0)
_apply_optional_context_metrics(projected, source=general_aggregate)
return projected
def apply_recent_bucket_results(
results: dict[str, Any],
persisted_corpus: dict[str, Any] | None,
*,
reference_at: datetime | None = None,
) -> dict[str, Any]:
"""Project canonical daily recent buckets into the v1 recent payload fields."""
projected = dict(results)
raw_recent = persisted_corpus.get("recent") if isinstance(persisted_corpus, dict) else None
raw_daily_buckets = raw_recent.get("daily_utc_buckets") if isinstance(raw_recent, dict) else None
from app.services.recent_window import reconstruct_recent_from_daily_utc_buckets
(
recent_topics,
recent_reviews_count,
recent_total_sentences,
recent_skipped_sentences,
recent_topic_bearing_review_count,
) = reconstruct_recent_from_daily_utc_buckets(
raw_daily_buckets,
reference_at=reference_at,
)
projected["recent_topics"] = (
[topic.model_dump(mode="json") for topic in recent_topics]
if recent_reviews_count > 0
else None
)
projected["recent_reviews_count"] = recent_reviews_count
projected["recent_scope"] = (
"sentimentstream_analyzed_reviews_last_30_days"
if recent_reviews_count > 0
else None
)
_apply_optional_context_metrics(
projected,
prefix="recent_",
source={
"total_sentences": recent_total_sentences,
"skipped_sentences": recent_skipped_sentences,
"topic_bearing_review_count": recent_topic_bearing_review_count,
},
)
return projected
def build_persisted_corpus_snapshot(
stale_analysis: "AnalysisDocumentReadModel | None",
*,
general_aggregate: dict[str, Any],
general_highlights: Any = None,
general_topic_highlights: Any = None,
general_global_pros_cons_signals: Any = None,
recent_daily_utc_buckets: list[dict[str, Any]] | None = None,
recent_bucket_evidence_caches: dict[int | str, dict[str, Any]] | None = None,
recent_evidence_mode: Literal["merge", "replace"] = "merge",
ingested_items: list[Any],
patch_timestamp: int | None,
current_patch_topics: list[TopicSentiment] | None,
current_patch_reviews_count: int,
current_patch_total_sentences: int | None = None,
current_patch_skipped_sentences: int | None = None,
current_patch_topic_bearing_review_count: int | None = None,
current_patch_highlights: Any = None,
current_patch_topic_highlights: Any = None,
current_patch_global_pros_cons_signals: Any = None,
current_patch_evidence_mode: Literal["merge", "replace"] = "replace",
) -> dict[str, Any]:
"""Update the canonical persisted-corpus envelope for one analysis write."""
persisted_corpus = (
copy.deepcopy(stale_analysis.persisted_corpus)
if stale_analysis and stale_analysis.persisted_corpus is not None
else build_empty_persisted_corpus_envelope()
)
stale_persisted_corpus = (
stale_analysis.persisted_corpus
if stale_analysis and stale_analysis.persisted_corpus is not None
else None
)
persisted_corpus["general"] = {"aggregate": general_aggregate}
if recent_daily_utc_buckets is not None:
persisted_corpus["recent"] = {
"daily_utc_buckets": copy.deepcopy(recent_daily_utc_buckets)
}
evidence_caches = persisted_corpus.get("evidence_caches")
if not isinstance(evidence_caches, dict):
evidence_caches = {
"general": build_empty_evidence_cache(),
"recent": {"daily_utc_buckets": {}},
"patch_epochs": {},
}
persisted_corpus["evidence_caches"] = evidence_caches
general_cache = _get_general_evidence_cache(
persisted_corpus,
stale_analysis.results if stale_analysis is not None else {},
)
incoming_general_cache = build_evidence_cache_payload(
highlights=general_highlights,
topic_highlights=general_topic_highlights,
global_pros_cons_signals=general_global_pros_cons_signals,
)
evidence_caches["general"] = merge_evidence_cache(general_cache, incoming_general_cache)
recent_cache_root = evidence_caches.get("recent")
if not isinstance(recent_cache_root, dict):
recent_cache_root = {"daily_utc_buckets": {}}
existing_recent_bucket_caches = recent_cache_root.get("daily_utc_buckets")
if not isinstance(existing_recent_bucket_caches, dict):
existing_recent_bucket_caches = {}
allowed_recent_bucket_keys = {
str(entry.get("bucket_start"))
for entry in recent_daily_utc_buckets or []
if isinstance(entry, dict) and isinstance(entry.get("bucket_start"), int)
}
merged_recent_bucket_caches: dict[str, dict[str, Any]] = (
{
key: normalize_evidence_cache(value)
for key, value in existing_recent_bucket_caches.items()
if key in allowed_recent_bucket_keys and isinstance(value, dict)
}
if recent_evidence_mode == "merge"
else {}
)
for bucket_key, bucket_cache in (recent_bucket_evidence_caches or {}).items():
normalized_key = str(bucket_key)
if normalized_key not in allowed_recent_bucket_keys:
continue
normalized_bucket_cache = normalize_evidence_cache(bucket_cache)
if recent_evidence_mode == "merge":
merged_recent_bucket_caches[normalized_key] = merge_evidence_cache(
merged_recent_bucket_caches.get(normalized_key),
normalized_bucket_cache,
)
else:
merged_recent_bucket_caches[normalized_key] = normalized_bucket_cache
evidence_caches["recent"] = {
"daily_utc_buckets": merged_recent_bucket_caches,
}
persisted_corpus["patch_epochs"] = merge_patch_epochs(
persisted_corpus,
patch_timestamp=patch_timestamp,
current_patch_topics=current_patch_topics,
current_patch_reviews_count=current_patch_reviews_count,
current_patch_total_sentences=current_patch_total_sentences,
current_patch_skipped_sentences=current_patch_skipped_sentences,
current_patch_topic_bearing_review_count=current_patch_topic_bearing_review_count,
current_patch_highlights=current_patch_highlights,
current_patch_topic_highlights=current_patch_topic_highlights,
current_patch_global_pros_cons_signals=current_patch_global_pros_cons_signals,
)
raw_patch_epoch_caches = evidence_caches.get("patch_epochs")
if not isinstance(raw_patch_epoch_caches, dict):
raw_patch_epoch_caches = {}
existing_patch_epoch_ids = {
epoch.get("id")
for epoch in normalize_patch_epochs(persisted_corpus)
if isinstance(epoch.get("id"), str)
}
patch_epoch_caches = {
epoch_id: normalize_evidence_cache(cache)
for epoch_id, cache in raw_patch_epoch_caches.items()
if epoch_id in existing_patch_epoch_ids and isinstance(cache, dict)
}
if patch_timestamp is not None:
patch_epoch_id = build_patch_epoch_id(patch_timestamp)
incoming_patch_cache = build_evidence_cache_payload(
highlights=current_patch_highlights,
topic_highlights=current_patch_topic_highlights,
global_pros_cons_signals=current_patch_global_pros_cons_signals,
)
existing_patch_cache = None
if current_patch_evidence_mode == "merge":
matching_epoch = next(
(
epoch
for epoch in normalize_patch_epochs(stale_persisted_corpus)
if epoch.get("id") == patch_epoch_id
),
None,
) if stale_persisted_corpus is not None else None
existing_patch_cache = (
patch_epoch_caches.get(patch_epoch_id)
or (_get_patch_epoch_evidence_cache(stale_persisted_corpus, matching_epoch) if matching_epoch else None)
)
patch_epoch_caches[patch_epoch_id] = (
merge_evidence_cache(existing_patch_cache, incoming_patch_cache)
if current_patch_evidence_mode == "merge"
else normalize_evidence_cache(incoming_patch_cache)
)
evidence_caches["patch_epochs"] = patch_epoch_caches
persisted_corpus["evidence_caches"] = evidence_caches
ingest_state = persisted_corpus.get("ingest_state")
if not isinstance(ingest_state, dict):
ingest_state = {
"cursor": None,
"latest_ingested_review_ts": None,
"latest_ingested_review_ids_at_ts": [],
}
persisted_corpus["ingest_state"] = ingest_state
if ingested_items:
existing_latest_ts = ingest_state.get("latest_ingested_review_ts")
existing_boundary_ids = ingest_state.get("latest_ingested_review_ids_at_ts")
existing_boundary_ids = (
[review_id for review_id in existing_boundary_ids if isinstance(review_id, str)]
if isinstance(existing_boundary_ids, list)
else []
)
latest_ts = max(item.timestamp_created for item in ingested_items)
latest_ids: list[str] = []
for item in ingested_items:
if item.timestamp_created != latest_ts:
continue
review_id = item.recommendation_id
if review_id in latest_ids:
continue
latest_ids.append(review_id)
if existing_latest_ts == latest_ts:
merged_boundary_ids = list(existing_boundary_ids)
for review_id in latest_ids:
if review_id not in merged_boundary_ids:
merged_boundary_ids.append(review_id)
latest_ids = merged_boundary_ids
ingest_state["latest_ingested_review_ts"] = latest_ts
ingest_state["latest_ingested_review_ids_at_ts"] = latest_ids
return persisted_corpus
def build_topic_highlights_payload(
topic_highlights: dict[str, list[dict[str, Any]]],
surviving_topics: set[str],
) -> list[TopicHighlights]:
"""Serialize grouped topic evidence for topics that survived aggregate filtering."""
return [
TopicHighlights(
topic=topic,
highlights=[TopicEvidenceSnippet(**highlight) for highlight in highlights],
)
for topic, highlights in topic_highlights.items()
if topic in surviving_topics
]
_LEGACY_FIELD_MAP = {
"topics": "general_topics",
"historical_topics": "general_topics",
"post_update_topics": "current_patch_topics",
"post_update_reviews_count": "current_patch_reviews_count",
"post_update_highlights": "current_patch_highlights",
"previous_update_topics": "last_patch_topics",
"previous_update_reviews_count": "last_patch_reviews_count",
"last_update_timestamp": "current_patch_timestamp",
}
@dataclass(frozen=True)
class AnalysisDocumentReadModel:
"""Normalized read model for persisted analysis documents."""
source_format: Literal["legacy", "persisted_corpus"]
results: dict[str, Any]
persisted_corpus: dict[str, Any] | None
analyzed_review_ids: list[str]
latest_review_timestamp: int
cached_at: Any = None
analyzed_at: Any = None
def normalize_legacy_results(results: dict[str, Any]) -> dict[str, Any]:
"""Map legacy persisted result fields to the current schema."""
normalized: dict[str, Any] = {}
for key, value in results.items():
new_key = _LEGACY_FIELD_MAP.get(key, key)
if key == "is_incremental":
continue
if new_key in {
"topic_highlights",
"recent_topic_highlights",
"current_patch_topic_highlights",
} and isinstance(value, list):
value = [TopicHighlights(**entry).model_dump() for entry in value]
if new_key not in normalized:
normalized[new_key] = value
raw_purchase_risk = normalized.get("purchase_risk")
if isinstance(raw_purchase_risk, dict):
localization = raw_purchase_risk.get("localization")
if isinstance(localization, dict) and localization.get("state") == "minor_issue":
normalized["purchase_risk"] = {
**raw_purchase_risk,
"localization": {
**localization,
"state": "minor_issues",
},
}
if not normalized.get("general_scope"):
normalized["general_scope"] = "sentimentstream_analyzed_reviews"
if (
not normalized.get("recent_scope")
and (
normalized.get("recent_topics") is not None
or normalized.get("recent_reviews_count", 0) > 0
)
):
normalized["recent_scope"] = "sentimentstream_analyzed_reviews_last_30_days"
return normalized
def read_analysis_document(
document: dict[str, Any] | None,
*,
recent_reference_at: datetime | None = None,
) -> AnalysisDocumentReadModel:
"""Return one normalized application contract for legacy and envelope docs.
Legacy documents stay readable through normalized `results`, but the adapter
must not synthesize persisted-corpus sections that were never stored.
"""
raw_document = document if isinstance(document, dict) else {}
raw_results = raw_document.get("results")
results = normalize_legacy_results(raw_results if isinstance(raw_results, dict) else {})
persisted_corpus = (
raw_document.get("persisted_corpus")
if has_persisted_corpus_envelope(raw_document)
else None
)
if persisted_corpus is not None:
results = apply_general_aggregate_results(results, persisted_corpus)
results = apply_recent_bucket_results(
results,
persisted_corpus,
reference_at=recent_reference_at,
)
results = apply_patch_epoch_results(results, persisted_corpus)
results = apply_evidence_cache_results(
results,
persisted_corpus,
reference_at=recent_reference_at,
)
return AnalysisDocumentReadModel(
source_format="persisted_corpus" if persisted_corpus is not None else "legacy",
results=results,
persisted_corpus=persisted_corpus,
analyzed_review_ids=list(raw_document.get("analyzed_review_ids", [])),
latest_review_timestamp=raw_document.get("latest_review_timestamp", 0),
cached_at=raw_document.get("cached_at"),
analyzed_at=raw_document.get("analyzed_at"),
)
def serialize_datetime(value: Any) -> str | Any:
"""Serialize datetimes in SSE payloads and persisted compatibility helpers."""
if isinstance(value, datetime):
return value.isoformat()
return value
def coerce_utc_datetime(value: Any) -> datetime | None:
"""Coerce persisted datetime values into timezone-aware UTC datetimes."""
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
parsed = datetime.fromisoformat(value)
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
return None
def datetime_from_timestamp(timestamp: int | None) -> datetime | None:
"""Convert a unix timestamp into UTC datetime."""
if timestamp is None:
return None
return datetime.fromtimestamp(timestamp, tz=timezone.utc)