FIN_ASSISTANT / presentation /components /analysis_formatter.py
QAway-to
Refine analyzer narrative variety
d3a3f8f
"""Utilities to render portfolio analysis output with styled HTML."""
from __future__ import annotations
import re
from html import escape
from html.parser import HTMLParser
from typing import Iterable, List, Tuple
_SPAN_TAG = re.compile(r"</?span(?:\s+[^>]*?)?>", re.IGNORECASE)
_SPAN_ATTR = re.compile(r"([a-zA-Z_:][-a-zA-Z0-9_:.]*)\s*=\s*\"(.*?)\"")
ALLOWED_CLASSES = {
"analysis-container",
"analysis-output",
"analysis-status",
"analysis-line",
"analysis-keyword",
"analysis-caret",
"bullet",
"metric",
"metric-name",
"metric-number",
"metric-separator",
"metric-value",
"negative",
"neutral",
"positive",
"section",
"section-divider",
}
ALLOWED_TAGS = {"div", "p", "span", "h2", "h3", "ul", "ol", "li", "hr"}
SECTION_TITLES: Tuple[str, ...] = (
"Objective Evaluation",
"Risk Assessment",
"Interpretation",
"Recommendation",
)
KEYWORD_HIGHLIGHTS: Tuple[str, ...] = (
"poor performance",
"high risk",
"underperformed",
"volatility",
"recommendation",
"drawdown",
"exposure",
"opportunity",
)
METRIC_TOOLTIPS = {
"Sharpe Ratio": "Sharpe Ratio: excess return per unit of total risk.",
"Sortino Ratio": "Sortino Ratio: downside-risk-adjusted performance.",
"Calmar Ratio": "Calmar Ratio: annual return divided by max drawdown.",
"Max Drawdown": "Max Drawdown: largest observed portfolio loss from peak.",
"Beta": "Beta: sensitivity to benchmark movements.",
"Volatility": "Volatility: standard deviation of returns.",
}
_KEYWORD_REGEX = re.compile(
"|".join(re.escape(word) for word in KEYWORD_HIGHLIGHTS), re.IGNORECASE
)
_METRIC_LINE = re.compile(r"^[-•]?\s*([^:]+?):\s*(.+)$")
_SECTION_HEADER = re.compile(r"^\*\*(.+?)\*\*")
def render_status_html(message: str) -> str:
"""Render interim status or error messages."""
safe = escape(message)
body = f"<div class='analysis-output'><p class='analysis-status'>{safe}</p></div>"
return _wrap_with_container(body)
def render_analysis_html(text: str, show_caret: bool = False) -> str:
"""Convert LLM response into themed HTML without inline styles."""
stripped = text.strip()
if not stripped:
html = _wrap_with_container("<div class='analysis-output'></div>")
return _append_caret(html) if show_caret else html
if _looks_like_html(stripped):
sanitized = _sanitize_analysis_html(stripped)
if sanitized.strip():
cleaned = _trim_trailing_breaks(sanitized).strip()
html = _wrap_with_container(cleaned)
return _append_caret(html) if show_caret else html
sections = _split_sections(stripped)
if not sections:
formatted_lines = _format_lines(stripped.splitlines())
body = "".join(formatted_lines)
html = _wrap_with_container(f"<div class='analysis-output'>{body}</div>")
return _append_caret(html) if show_caret else html
parts: List[str] = ["<div class='analysis-output'>"]
for idx, (title, content) in enumerate(sections):
parts.append("<div class='section'>")
parts.append(f"<h2>{escape(title)}</h2>")
formatted_lines = _format_lines(content.splitlines())
parts.extend(formatted_lines)
parts.append("</div>")
if idx < len(sections) - 1:
parts.append("<div class='section-divider'></div>")
parts.append("</div>")
html = "".join(parts)
html = _wrap_with_container(_trim_trailing_breaks(html).strip())
return _append_caret(html) if show_caret else html
def _split_sections(text: str) -> List[Tuple[str, str]]:
sections: List[Tuple[str, str]] = []
current_title = None
buffer: List[str] = []
allowed_headers = {title.lower(): title for title in SECTION_TITLES}
for line in text.splitlines():
stripped = line.strip()
header_match = _SECTION_HEADER.match(stripped)
if header_match:
# flush previous section
if current_title and buffer:
sections.append((current_title, "\n".join(buffer).strip()))
buffer.clear()
matched_title = header_match.group(1).strip()
normalized = allowed_headers.get(matched_title.lower(), matched_title)
current_title = normalized
continue
if stripped in allowed_headers:
if current_title and buffer:
sections.append((current_title, "\n".join(buffer).strip()))
buffer.clear()
current_title = allowed_headers[stripped]
continue
buffer.append(line)
if current_title and buffer:
sections.append((current_title, "\n".join(buffer).strip()))
return sections
def _format_lines(lines: Iterable[str]) -> List[str]:
formatted: List[str] = []
paragraph_buffer: List[str] = []
seen_metrics: set[str] = set()
seen_paragraphs: set[str] = set()
def flush_paragraph() -> None:
if not paragraph_buffer:
return
paragraph_text = " ".join(paragraph_buffer)
normalized = re.sub(r"\s+", " ", paragraph_text).strip()
lower_key = normalized.lower()
if lower_key and lower_key not in seen_paragraphs:
seen_paragraphs.add(lower_key)
formatted.append(
f"<p class='analysis-line'>{_decorate_text(paragraph_text)}</p>"
)
paragraph_buffer.clear()
for raw_line in lines:
line = raw_line.strip()
if not line:
flush_paragraph()
continue
metric_match = _METRIC_LINE.match(line)
if metric_match:
flush_paragraph()
metric_name = metric_match.group(1).strip().lower()
if metric_name and metric_name not in seen_metrics:
seen_metrics.add(metric_name)
formatted.append(
_format_metric_line(
metric_match.group(1), metric_match.group(2)
)
)
continue
bullet = raw_line.lstrip().startswith(('-', '•'))
if bullet:
flush_paragraph()
content = re.sub(r"^[-•]\s*", "", line)
normalized = re.sub(r"\s+", " ", content).strip().lower()
if normalized and normalized not in seen_paragraphs:
seen_paragraphs.add(normalized)
formatted.append(
f"<p class='analysis-line bullet'>{_decorate_text(content)}</p>"
)
continue
paragraph_buffer.append(line)
flush_paragraph()
return formatted
def _format_metric_line(name: str, value: str) -> str:
tooltip = METRIC_TOOLTIPS.get(name.strip())
name_text = escape(name.strip())
name_span = (
f"<span class='metric-name' data-tooltip='{escape(tooltip)}'>{name_text}</span>"
if tooltip
else f"<span class='metric-name'>{name_text}</span>"
)
value_span = f"<span class='metric-value'>{_decorate_metric_value(value)}</span>"
return (
"<p class='analysis-line metric'>"
f"{name_span} <span class='metric-separator'>:</span> {value_span}"
"</p>"
)
def _decorate_text(text: str) -> str:
preserved = _preserve_spans(text)
if not preserved:
return ""
highlighted = _KEYWORD_REGEX.sub(
lambda match: f"<span class='analysis-keyword'>{match.group(0)}</span>", preserved
)
return highlighted
_NUMERIC_TOKEN = re.compile(r"[-+]?\d+(?:[\.,]\d+)?(?:\s?(?:%|bps|bp|x|X))?")
def _decorate_metric_value(value: str) -> str:
parts: List[str] = []
last_index = 0
for match in _NUMERIC_TOKEN.finditer(value):
start, end = match.span()
if start > last_index:
parts.append(_decorate_text(value[last_index:start]))
token = match.group(0)
number_class = _numeric_class(token)
parts.append(
f"<span class='metric-number {number_class}'>{escape(token.strip())}</span>"
)
last_index = end
if last_index < len(value):
parts.append(_decorate_text(value[last_index:]))
if not parts:
return _decorate_text(value)
return "".join(parts)
def _preserve_spans(text: str) -> str:
"""Escape text while allowing limited span tags for inline emphasis."""
result: List[str] = []
last_index = 0
for match in _SPAN_TAG.finditer(text):
start, end = match.span()
if start > last_index:
result.append(escape(text[last_index:start]))
result.append(_sanitize_span(match.group(0)))
last_index = end
if last_index < len(text):
result.append(escape(text[last_index:]))
return "".join(result)
def _sanitize_span(tag: str) -> str:
if tag.startswith("</"):
return "</span>"
attributes = {}
for attr, value in _SPAN_ATTR.findall(tag):
if attr.lower() != "class":
continue
filtered = _filter_allowed_classes(value)
if filtered:
attributes["class"] = filtered
attr_string = "".join(
f" {name}=\"{escape(val)}\"" for name, val in attributes.items()
)
return f"<span{attr_string}>"
def _filter_allowed_classes(raw_value: str) -> str:
classes = [cls for cls in raw_value.split() if cls in ALLOWED_CLASSES]
return " ".join(dict.fromkeys(classes))
def _looks_like_html(text: str) -> bool:
return bool(re.search(r"<\s*(div|p|span|h2|h3|ul|ol|li)\b", text, re.IGNORECASE))
class _AnalyzerHTMLSanitizer(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.parts: List[str] = []
self._open_tags: List[str] = []
def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]) -> None:
tag_lower = tag.lower()
if tag_lower not in ALLOWED_TAGS:
self._open_tags.append("")
return
attr_string = ""
if attrs:
allowed_attrs = []
for name, value in attrs:
name_lower = name.lower()
if name_lower == "class":
filtered = _filter_allowed_classes(value)
if filtered:
allowed_attrs.append(("class", filtered))
if allowed_attrs:
attr_string = "".join(
f" {escape(attr)}=\"{escape(val)}\"" for attr, val in allowed_attrs
)
if tag_lower == "hr":
self.parts.append(f"<{tag_lower}{attr_string}>")
self._open_tags.append("")
return
self.parts.append(f"<{tag_lower}{attr_string}>")
self._open_tags.append(tag_lower)
def handle_endtag(self, tag: str) -> None:
if not self._open_tags:
return
open_tag = self._open_tags.pop()
if open_tag:
self.parts.append(f"</{open_tag}>")
def handle_data(self, data: str) -> None:
if data:
self.parts.append(escape(data))
def handle_entityref(self, name: str) -> None:
self.parts.append(f"&{name};")
def handle_charref(self, name: str) -> None:
self.parts.append(f"&#{name};")
def _sanitize_analysis_html(text: str) -> str:
sanitizer = _AnalyzerHTMLSanitizer()
sanitizer.feed(text)
sanitizer.close()
sanitized = "".join(sanitizer.parts)
return re.sub(r"<style.*?>.*?</style>", "", sanitized, flags=re.IGNORECASE | re.DOTALL)
def _numeric_class(token: str) -> str:
cleaned = token.strip()
if not cleaned:
return "neutral"
normalized = cleaned.replace(",", ".")
stripped = re.sub(r"[^0-9+\-\.]+", "", normalized)
try:
number = float(stripped)
except ValueError:
return "neutral"
if number < 0:
return "negative"
if number > 0:
return "positive"
return "neutral"
def _trim_trailing_breaks(html: str) -> str:
return re.sub(r"(?:<br\s*/?>\s*)+$", "", html)
def _wrap_with_container(body: str) -> str:
"""Ensure the analysis output is wrapped in the themed container."""
if re.search(r"class\s*=\s*['\"]analysis-container['\"]", body):
return body
return f"<div class='analysis-container'>{body}</div>"
def _append_caret(html: str) -> str:
"""Append a blinking caret to indicate streaming output."""
caret = "<span class='analysis-caret'>|</span>"
if caret in html:
return html
updated = re.sub(r"(</div>\s*</div>\s*)$", caret + r"\1", html, count=1)
if updated == html:
return html + caret
return updated