Spaces:
Sleeping
Sleeping
| """Utilities to render portfolio analysis output with styled HTML.""" | |
| from __future__ import annotations | |
| import re | |
| from html import escape | |
| from html.parser import HTMLParser | |
| from typing import Iterable, List, Tuple | |
| _SPAN_TAG = re.compile(r"</?span(?:\s+[^>]*?)?>", re.IGNORECASE) | |
| _SPAN_ATTR = re.compile(r"([a-zA-Z_:][-a-zA-Z0-9_:.]*)\s*=\s*\"(.*?)\"") | |
| ALLOWED_CLASSES = { | |
| "analysis-container", | |
| "analysis-output", | |
| "analysis-status", | |
| "analysis-line", | |
| "analysis-keyword", | |
| "analysis-caret", | |
| "bullet", | |
| "metric", | |
| "metric-name", | |
| "metric-number", | |
| "metric-separator", | |
| "metric-value", | |
| "negative", | |
| "neutral", | |
| "positive", | |
| "section", | |
| "section-divider", | |
| } | |
| ALLOWED_TAGS = {"div", "p", "span", "h2", "h3", "ul", "ol", "li", "hr"} | |
| SECTION_TITLES: Tuple[str, ...] = ( | |
| "Objective Evaluation", | |
| "Risk Assessment", | |
| "Interpretation", | |
| "Recommendation", | |
| ) | |
| KEYWORD_HIGHLIGHTS: Tuple[str, ...] = ( | |
| "poor performance", | |
| "high risk", | |
| "underperformed", | |
| "volatility", | |
| "recommendation", | |
| "drawdown", | |
| "exposure", | |
| "opportunity", | |
| ) | |
| METRIC_TOOLTIPS = { | |
| "Sharpe Ratio": "Sharpe Ratio: excess return per unit of total risk.", | |
| "Sortino Ratio": "Sortino Ratio: downside-risk-adjusted performance.", | |
| "Calmar Ratio": "Calmar Ratio: annual return divided by max drawdown.", | |
| "Max Drawdown": "Max Drawdown: largest observed portfolio loss from peak.", | |
| "Beta": "Beta: sensitivity to benchmark movements.", | |
| "Volatility": "Volatility: standard deviation of returns.", | |
| } | |
| _KEYWORD_REGEX = re.compile( | |
| "|".join(re.escape(word) for word in KEYWORD_HIGHLIGHTS), re.IGNORECASE | |
| ) | |
| _METRIC_LINE = re.compile(r"^[-•]?\s*([^:]+?):\s*(.+)$") | |
| _SECTION_HEADER = re.compile(r"^\*\*(.+?)\*\*") | |
| def render_status_html(message: str) -> str: | |
| """Render interim status or error messages.""" | |
| safe = escape(message) | |
| body = f"<div class='analysis-output'><p class='analysis-status'>{safe}</p></div>" | |
| return _wrap_with_container(body) | |
| def render_analysis_html(text: str, show_caret: bool = False) -> str: | |
| """Convert LLM response into themed HTML without inline styles.""" | |
| stripped = text.strip() | |
| if not stripped: | |
| html = _wrap_with_container("<div class='analysis-output'></div>") | |
| return _append_caret(html) if show_caret else html | |
| if _looks_like_html(stripped): | |
| sanitized = _sanitize_analysis_html(stripped) | |
| if sanitized.strip(): | |
| cleaned = _trim_trailing_breaks(sanitized).strip() | |
| html = _wrap_with_container(cleaned) | |
| return _append_caret(html) if show_caret else html | |
| sections = _split_sections(stripped) | |
| if not sections: | |
| formatted_lines = _format_lines(stripped.splitlines()) | |
| body = "".join(formatted_lines) | |
| html = _wrap_with_container(f"<div class='analysis-output'>{body}</div>") | |
| return _append_caret(html) if show_caret else html | |
| parts: List[str] = ["<div class='analysis-output'>"] | |
| for idx, (title, content) in enumerate(sections): | |
| parts.append("<div class='section'>") | |
| parts.append(f"<h2>{escape(title)}</h2>") | |
| formatted_lines = _format_lines(content.splitlines()) | |
| parts.extend(formatted_lines) | |
| parts.append("</div>") | |
| if idx < len(sections) - 1: | |
| parts.append("<div class='section-divider'></div>") | |
| parts.append("</div>") | |
| html = "".join(parts) | |
| html = _wrap_with_container(_trim_trailing_breaks(html).strip()) | |
| return _append_caret(html) if show_caret else html | |
| def _split_sections(text: str) -> List[Tuple[str, str]]: | |
| sections: List[Tuple[str, str]] = [] | |
| current_title = None | |
| buffer: List[str] = [] | |
| allowed_headers = {title.lower(): title for title in SECTION_TITLES} | |
| for line in text.splitlines(): | |
| stripped = line.strip() | |
| header_match = _SECTION_HEADER.match(stripped) | |
| if header_match: | |
| # flush previous section | |
| if current_title and buffer: | |
| sections.append((current_title, "\n".join(buffer).strip())) | |
| buffer.clear() | |
| matched_title = header_match.group(1).strip() | |
| normalized = allowed_headers.get(matched_title.lower(), matched_title) | |
| current_title = normalized | |
| continue | |
| if stripped in allowed_headers: | |
| if current_title and buffer: | |
| sections.append((current_title, "\n".join(buffer).strip())) | |
| buffer.clear() | |
| current_title = allowed_headers[stripped] | |
| continue | |
| buffer.append(line) | |
| if current_title and buffer: | |
| sections.append((current_title, "\n".join(buffer).strip())) | |
| return sections | |
| def _format_lines(lines: Iterable[str]) -> List[str]: | |
| formatted: List[str] = [] | |
| paragraph_buffer: List[str] = [] | |
| seen_metrics: set[str] = set() | |
| seen_paragraphs: set[str] = set() | |
| def flush_paragraph() -> None: | |
| if not paragraph_buffer: | |
| return | |
| paragraph_text = " ".join(paragraph_buffer) | |
| normalized = re.sub(r"\s+", " ", paragraph_text).strip() | |
| lower_key = normalized.lower() | |
| if lower_key and lower_key not in seen_paragraphs: | |
| seen_paragraphs.add(lower_key) | |
| formatted.append( | |
| f"<p class='analysis-line'>{_decorate_text(paragraph_text)}</p>" | |
| ) | |
| paragraph_buffer.clear() | |
| for raw_line in lines: | |
| line = raw_line.strip() | |
| if not line: | |
| flush_paragraph() | |
| continue | |
| metric_match = _METRIC_LINE.match(line) | |
| if metric_match: | |
| flush_paragraph() | |
| metric_name = metric_match.group(1).strip().lower() | |
| if metric_name and metric_name not in seen_metrics: | |
| seen_metrics.add(metric_name) | |
| formatted.append( | |
| _format_metric_line( | |
| metric_match.group(1), metric_match.group(2) | |
| ) | |
| ) | |
| continue | |
| bullet = raw_line.lstrip().startswith(('-', '•')) | |
| if bullet: | |
| flush_paragraph() | |
| content = re.sub(r"^[-•]\s*", "", line) | |
| normalized = re.sub(r"\s+", " ", content).strip().lower() | |
| if normalized and normalized not in seen_paragraphs: | |
| seen_paragraphs.add(normalized) | |
| formatted.append( | |
| f"<p class='analysis-line bullet'>{_decorate_text(content)}</p>" | |
| ) | |
| continue | |
| paragraph_buffer.append(line) | |
| flush_paragraph() | |
| return formatted | |
| def _format_metric_line(name: str, value: str) -> str: | |
| tooltip = METRIC_TOOLTIPS.get(name.strip()) | |
| name_text = escape(name.strip()) | |
| name_span = ( | |
| f"<span class='metric-name' data-tooltip='{escape(tooltip)}'>{name_text}</span>" | |
| if tooltip | |
| else f"<span class='metric-name'>{name_text}</span>" | |
| ) | |
| value_span = f"<span class='metric-value'>{_decorate_metric_value(value)}</span>" | |
| return ( | |
| "<p class='analysis-line metric'>" | |
| f"{name_span} <span class='metric-separator'>:</span> {value_span}" | |
| "</p>" | |
| ) | |
| def _decorate_text(text: str) -> str: | |
| preserved = _preserve_spans(text) | |
| if not preserved: | |
| return "" | |
| highlighted = _KEYWORD_REGEX.sub( | |
| lambda match: f"<span class='analysis-keyword'>{match.group(0)}</span>", preserved | |
| ) | |
| return highlighted | |
| _NUMERIC_TOKEN = re.compile(r"[-+]?\d+(?:[\.,]\d+)?(?:\s?(?:%|bps|bp|x|X))?") | |
| def _decorate_metric_value(value: str) -> str: | |
| parts: List[str] = [] | |
| last_index = 0 | |
| for match in _NUMERIC_TOKEN.finditer(value): | |
| start, end = match.span() | |
| if start > last_index: | |
| parts.append(_decorate_text(value[last_index:start])) | |
| token = match.group(0) | |
| number_class = _numeric_class(token) | |
| parts.append( | |
| f"<span class='metric-number {number_class}'>{escape(token.strip())}</span>" | |
| ) | |
| last_index = end | |
| if last_index < len(value): | |
| parts.append(_decorate_text(value[last_index:])) | |
| if not parts: | |
| return _decorate_text(value) | |
| return "".join(parts) | |
| def _preserve_spans(text: str) -> str: | |
| """Escape text while allowing limited span tags for inline emphasis.""" | |
| result: List[str] = [] | |
| last_index = 0 | |
| for match in _SPAN_TAG.finditer(text): | |
| start, end = match.span() | |
| if start > last_index: | |
| result.append(escape(text[last_index:start])) | |
| result.append(_sanitize_span(match.group(0))) | |
| last_index = end | |
| if last_index < len(text): | |
| result.append(escape(text[last_index:])) | |
| return "".join(result) | |
| def _sanitize_span(tag: str) -> str: | |
| if tag.startswith("</"): | |
| return "</span>" | |
| attributes = {} | |
| for attr, value in _SPAN_ATTR.findall(tag): | |
| if attr.lower() != "class": | |
| continue | |
| filtered = _filter_allowed_classes(value) | |
| if filtered: | |
| attributes["class"] = filtered | |
| attr_string = "".join( | |
| f" {name}=\"{escape(val)}\"" for name, val in attributes.items() | |
| ) | |
| return f"<span{attr_string}>" | |
| def _filter_allowed_classes(raw_value: str) -> str: | |
| classes = [cls for cls in raw_value.split() if cls in ALLOWED_CLASSES] | |
| return " ".join(dict.fromkeys(classes)) | |
| def _looks_like_html(text: str) -> bool: | |
| return bool(re.search(r"<\s*(div|p|span|h2|h3|ul|ol|li)\b", text, re.IGNORECASE)) | |
| class _AnalyzerHTMLSanitizer(HTMLParser): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.parts: List[str] = [] | |
| self._open_tags: List[str] = [] | |
| def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]) -> None: | |
| tag_lower = tag.lower() | |
| if tag_lower not in ALLOWED_TAGS: | |
| self._open_tags.append("") | |
| return | |
| attr_string = "" | |
| if attrs: | |
| allowed_attrs = [] | |
| for name, value in attrs: | |
| name_lower = name.lower() | |
| if name_lower == "class": | |
| filtered = _filter_allowed_classes(value) | |
| if filtered: | |
| allowed_attrs.append(("class", filtered)) | |
| if allowed_attrs: | |
| attr_string = "".join( | |
| f" {escape(attr)}=\"{escape(val)}\"" for attr, val in allowed_attrs | |
| ) | |
| if tag_lower == "hr": | |
| self.parts.append(f"<{tag_lower}{attr_string}>") | |
| self._open_tags.append("") | |
| return | |
| self.parts.append(f"<{tag_lower}{attr_string}>") | |
| self._open_tags.append(tag_lower) | |
| def handle_endtag(self, tag: str) -> None: | |
| if not self._open_tags: | |
| return | |
| open_tag = self._open_tags.pop() | |
| if open_tag: | |
| self.parts.append(f"</{open_tag}>") | |
| def handle_data(self, data: str) -> None: | |
| if data: | |
| self.parts.append(escape(data)) | |
| def handle_entityref(self, name: str) -> None: | |
| self.parts.append(f"&{name};") | |
| def handle_charref(self, name: str) -> None: | |
| self.parts.append(f"&#{name};") | |
| def _sanitize_analysis_html(text: str) -> str: | |
| sanitizer = _AnalyzerHTMLSanitizer() | |
| sanitizer.feed(text) | |
| sanitizer.close() | |
| sanitized = "".join(sanitizer.parts) | |
| return re.sub(r"<style.*?>.*?</style>", "", sanitized, flags=re.IGNORECASE | re.DOTALL) | |
| def _numeric_class(token: str) -> str: | |
| cleaned = token.strip() | |
| if not cleaned: | |
| return "neutral" | |
| normalized = cleaned.replace(",", ".") | |
| stripped = re.sub(r"[^0-9+\-\.]+", "", normalized) | |
| try: | |
| number = float(stripped) | |
| except ValueError: | |
| return "neutral" | |
| if number < 0: | |
| return "negative" | |
| if number > 0: | |
| return "positive" | |
| return "neutral" | |
| def _trim_trailing_breaks(html: str) -> str: | |
| return re.sub(r"(?:<br\s*/?>\s*)+$", "", html) | |
| def _wrap_with_container(body: str) -> str: | |
| """Ensure the analysis output is wrapped in the themed container.""" | |
| if re.search(r"class\s*=\s*['\"]analysis-container['\"]", body): | |
| return body | |
| return f"<div class='analysis-container'>{body}</div>" | |
| def _append_caret(html: str) -> str: | |
| """Append a blinking caret to indicate streaming output.""" | |
| caret = "<span class='analysis-caret'>|</span>" | |
| if caret in html: | |
| return html | |
| updated = re.sub(r"(</div>\s*</div>\s*)$", caret + r"\1", html, count=1) | |
| if updated == html: | |
| return html + caret | |
| return updated | |