"""Utilities to render portfolio analysis output with styled HTML.""" from __future__ import annotations import re from html import escape from html.parser import HTMLParser from typing import Iterable, List, Tuple _SPAN_TAG = re.compile(r"]*?)?>", re.IGNORECASE) _SPAN_ATTR = re.compile(r"([a-zA-Z_:][-a-zA-Z0-9_:.]*)\s*=\s*\"(.*?)\"") ALLOWED_CLASSES = { "analysis-container", "analysis-output", "analysis-status", "analysis-line", "analysis-keyword", "analysis-caret", "bullet", "metric", "metric-name", "metric-number", "metric-separator", "metric-value", "negative", "neutral", "positive", "section", "section-divider", } ALLOWED_TAGS = {"div", "p", "span", "h2", "h3", "ul", "ol", "li", "hr"} SECTION_TITLES: Tuple[str, ...] = ( "Objective Evaluation", "Risk Assessment", "Interpretation", "Recommendation", ) KEYWORD_HIGHLIGHTS: Tuple[str, ...] = ( "poor performance", "high risk", "underperformed", "volatility", "recommendation", "drawdown", "exposure", "opportunity", ) METRIC_TOOLTIPS = { "Sharpe Ratio": "Sharpe Ratio: excess return per unit of total risk.", "Sortino Ratio": "Sortino Ratio: downside-risk-adjusted performance.", "Calmar Ratio": "Calmar Ratio: annual return divided by max drawdown.", "Max Drawdown": "Max Drawdown: largest observed portfolio loss from peak.", "Beta": "Beta: sensitivity to benchmark movements.", "Volatility": "Volatility: standard deviation of returns.", } _KEYWORD_REGEX = re.compile( "|".join(re.escape(word) for word in KEYWORD_HIGHLIGHTS), re.IGNORECASE ) _METRIC_LINE = re.compile(r"^[-•]?\s*([^:]+?):\s*(.+)$") _SECTION_HEADER = re.compile(r"^\*\*(.+?)\*\*") def render_status_html(message: str) -> str: """Render interim status or error messages.""" safe = escape(message) body = f"

{safe}

" return _wrap_with_container(body) def render_analysis_html(text: str, show_caret: bool = False) -> str: """Convert LLM response into themed HTML without inline styles.""" stripped = text.strip() if not stripped: html = _wrap_with_container("
") return _append_caret(html) if show_caret else html if _looks_like_html(stripped): sanitized = _sanitize_analysis_html(stripped) if sanitized.strip(): cleaned = _trim_trailing_breaks(sanitized).strip() html = _wrap_with_container(cleaned) return _append_caret(html) if show_caret else html sections = _split_sections(stripped) if not sections: formatted_lines = _format_lines(stripped.splitlines()) body = "".join(formatted_lines) html = _wrap_with_container(f"
{body}
") return _append_caret(html) if show_caret else html parts: List[str] = ["
"] for idx, (title, content) in enumerate(sections): parts.append("
") parts.append(f"

{escape(title)}

") formatted_lines = _format_lines(content.splitlines()) parts.extend(formatted_lines) parts.append("
") if idx < len(sections) - 1: parts.append("
") parts.append("
") html = "".join(parts) html = _wrap_with_container(_trim_trailing_breaks(html).strip()) return _append_caret(html) if show_caret else html def _split_sections(text: str) -> List[Tuple[str, str]]: sections: List[Tuple[str, str]] = [] current_title = None buffer: List[str] = [] allowed_headers = {title.lower(): title for title in SECTION_TITLES} for line in text.splitlines(): stripped = line.strip() header_match = _SECTION_HEADER.match(stripped) if header_match: # flush previous section if current_title and buffer: sections.append((current_title, "\n".join(buffer).strip())) buffer.clear() matched_title = header_match.group(1).strip() normalized = allowed_headers.get(matched_title.lower(), matched_title) current_title = normalized continue if stripped in allowed_headers: if current_title and buffer: sections.append((current_title, "\n".join(buffer).strip())) buffer.clear() current_title = allowed_headers[stripped] continue buffer.append(line) if current_title and buffer: sections.append((current_title, "\n".join(buffer).strip())) return sections def _format_lines(lines: Iterable[str]) -> List[str]: formatted: List[str] = [] paragraph_buffer: List[str] = [] seen_metrics: set[str] = set() seen_paragraphs: set[str] = set() def flush_paragraph() -> None: if not paragraph_buffer: return paragraph_text = " ".join(paragraph_buffer) normalized = re.sub(r"\s+", " ", paragraph_text).strip() lower_key = normalized.lower() if lower_key and lower_key not in seen_paragraphs: seen_paragraphs.add(lower_key) formatted.append( f"

{_decorate_text(paragraph_text)}

" ) paragraph_buffer.clear() for raw_line in lines: line = raw_line.strip() if not line: flush_paragraph() continue metric_match = _METRIC_LINE.match(line) if metric_match: flush_paragraph() metric_name = metric_match.group(1).strip().lower() if metric_name and metric_name not in seen_metrics: seen_metrics.add(metric_name) formatted.append( _format_metric_line( metric_match.group(1), metric_match.group(2) ) ) continue bullet = raw_line.lstrip().startswith(('-', '•')) if bullet: flush_paragraph() content = re.sub(r"^[-•]\s*", "", line) normalized = re.sub(r"\s+", " ", content).strip().lower() if normalized and normalized not in seen_paragraphs: seen_paragraphs.add(normalized) formatted.append( f"

{_decorate_text(content)}

" ) continue paragraph_buffer.append(line) flush_paragraph() return formatted def _format_metric_line(name: str, value: str) -> str: tooltip = METRIC_TOOLTIPS.get(name.strip()) name_text = escape(name.strip()) name_span = ( f"{name_text}" if tooltip else f"{name_text}" ) value_span = f"{_decorate_metric_value(value)}" return ( "

" f"{name_span} : {value_span}" "

" ) def _decorate_text(text: str) -> str: preserved = _preserve_spans(text) if not preserved: return "" highlighted = _KEYWORD_REGEX.sub( lambda match: f"{match.group(0)}", preserved ) return highlighted _NUMERIC_TOKEN = re.compile(r"[-+]?\d+(?:[\.,]\d+)?(?:\s?(?:%|bps|bp|x|X))?") def _decorate_metric_value(value: str) -> str: parts: List[str] = [] last_index = 0 for match in _NUMERIC_TOKEN.finditer(value): start, end = match.span() if start > last_index: parts.append(_decorate_text(value[last_index:start])) token = match.group(0) number_class = _numeric_class(token) parts.append( f"{escape(token.strip())}" ) last_index = end if last_index < len(value): parts.append(_decorate_text(value[last_index:])) if not parts: return _decorate_text(value) return "".join(parts) def _preserve_spans(text: str) -> str: """Escape text while allowing limited span tags for inline emphasis.""" result: List[str] = [] last_index = 0 for match in _SPAN_TAG.finditer(text): start, end = match.span() if start > last_index: result.append(escape(text[last_index:start])) result.append(_sanitize_span(match.group(0))) last_index = end if last_index < len(text): result.append(escape(text[last_index:])) return "".join(result) def _sanitize_span(tag: str) -> str: if tag.startswith("" attributes = {} for attr, value in _SPAN_ATTR.findall(tag): if attr.lower() != "class": continue filtered = _filter_allowed_classes(value) if filtered: attributes["class"] = filtered attr_string = "".join( f" {name}=\"{escape(val)}\"" for name, val in attributes.items() ) return f"" def _filter_allowed_classes(raw_value: str) -> str: classes = [cls for cls in raw_value.split() if cls in ALLOWED_CLASSES] return " ".join(dict.fromkeys(classes)) def _looks_like_html(text: str) -> bool: return bool(re.search(r"<\s*(div|p|span|h2|h3|ul|ol|li)\b", text, re.IGNORECASE)) class _AnalyzerHTMLSanitizer(HTMLParser): def __init__(self) -> None: super().__init__() self.parts: List[str] = [] self._open_tags: List[str] = [] def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]) -> None: tag_lower = tag.lower() if tag_lower not in ALLOWED_TAGS: self._open_tags.append("") return attr_string = "" if attrs: allowed_attrs = [] for name, value in attrs: name_lower = name.lower() if name_lower == "class": filtered = _filter_allowed_classes(value) if filtered: allowed_attrs.append(("class", filtered)) if allowed_attrs: attr_string = "".join( f" {escape(attr)}=\"{escape(val)}\"" for attr, val in allowed_attrs ) if tag_lower == "hr": self.parts.append(f"<{tag_lower}{attr_string}>") self._open_tags.append("") return self.parts.append(f"<{tag_lower}{attr_string}>") self._open_tags.append(tag_lower) def handle_endtag(self, tag: str) -> None: if not self._open_tags: return open_tag = self._open_tags.pop() if open_tag: self.parts.append(f"") def handle_data(self, data: str) -> None: if data: self.parts.append(escape(data)) def handle_entityref(self, name: str) -> None: self.parts.append(f"&{name};") def handle_charref(self, name: str) -> None: self.parts.append(f"&#{name};") def _sanitize_analysis_html(text: str) -> str: sanitizer = _AnalyzerHTMLSanitizer() sanitizer.feed(text) sanitizer.close() sanitized = "".join(sanitizer.parts) return re.sub(r".*?", "", sanitized, flags=re.IGNORECASE | re.DOTALL) def _numeric_class(token: str) -> str: cleaned = token.strip() if not cleaned: return "neutral" normalized = cleaned.replace(",", ".") stripped = re.sub(r"[^0-9+\-\.]+", "", normalized) try: number = float(stripped) except ValueError: return "neutral" if number < 0: return "negative" if number > 0: return "positive" return "neutral" def _trim_trailing_breaks(html: str) -> str: return re.sub(r"(?:\s*)+$", "", html) def _wrap_with_container(body: str) -> str: """Ensure the analysis output is wrapped in the themed container.""" if re.search(r"class\s*=\s*['\"]analysis-container['\"]", body): return body return f"
{body}
" def _append_caret(html: str) -> str: """Append a blinking caret to indicate streaming output.""" caret = "|" if caret in html: return html updated = re.sub(r"(\s*\s*)$", caret + r"\1", html, count=1) if updated == html: return html + caret return updated