| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Tuple, Optional |
| import logging |
| from datetime import datetime |
|
|
| from open_storyline.utils.logging import get_logger |
|
|
|
|
| @dataclass |
| class LogEntry: |
| """Single log entry""" |
| level: str |
| message: str |
| timestamp: str |
| artifact_id: Optional[str] = None |
| extra_data: Dict[str, Any] = field(default_factory=dict) |
|
|
| @dataclass |
| class NodeSummary: |
| """ |
| Node Execution Status Summary - Reuses existing logger module |
| |
| Features: |
| 1. ERROR - Error messages for LLM |
| 2. WARNING - Warning messages for LLM |
| 3. DEBUG - Debug information for developers |
| 4. INFO_LLM - Detailed information for LLM |
| 5. INFO_USER - Brief information for users |
| |
| Capabilities: |
| - Reuses get_logger() configuration |
| - Hierarchical log storage and extraction |
| - Colored console output |
| - Log compression functionality |
| - Supports artifact tracking |
| """ |
| ERROR: str = "ERROR" |
| DEBUG: str = "DEBUG" |
| WARNING: str = "WARNING" |
| INFO_LLM: str = "INFO_LLM" |
| INFO_USER: str = "INFO_USER" |
| LOGGER_LEVELS: Tuple[str, ...] = (ERROR, DEBUG, WARNING, INFO_LLM, INFO_USER) |
|
|
|
|
| |
| log_error: List[LogEntry] = field(default_factory=list) |
| log_warn: List[LogEntry] = field(default_factory=list) |
| log_info_llm: List[LogEntry] = field(default_factory=list) |
| log_info_user: List[LogEntry] = field(default_factory=list) |
| log_debug: List[LogEntry] = field(default_factory=list) |
| |
| |
| artifact_warnings: Dict[str, List[str]] = field(default_factory=dict) |
| artifact_errors: Dict[str, List[str]] = field(default_factory=dict) |
| |
| |
| logger_name: Optional[str] = field(default=None) |
| auto_console: bool = field(default=True) |
| summary_levels: Optional[List[str]] = field(default=None) |
|
|
| |
| _logger: Optional[logging.Logger] = field(default=None, init=False, repr=False) |
|
|
| def __post_init__(self): |
| """Initialize logger - reuses get_logger""" |
| if self.logger_name is None: |
| self.logger_name = "NodeSummary" |
| self._logger = get_logger(self.logger_name) |
| if self.summary_levels is None: |
| self.summary_levels = [self.ERROR, self.WARNING, self.INFO_LLM, self.INFO_USER] |
|
|
| def _log_to_console(self, level: int, message: str, artifact_id: Optional[str] = None): |
| """Output to console (using configured logger)""" |
| if not self.auto_console: |
| return |
| |
| prefix = f"[ARTIFACT:{artifact_id}] " if artifact_id else "" |
| self._logger.log(level, f"{prefix}{message}") |
| |
| def add_error(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| """Log error messages - for LLM""" |
| entry = LogEntry( |
| level=self.ERROR, |
| message=message, |
| timestamp=datetime.now().isoformat(), |
| artifact_id=artifact_id, |
| extra_data=kwargs |
| ) |
| self.log_error.append(entry) |
| |
| if artifact_id: |
| self.artifact_errors.setdefault(artifact_id, []).append(message) |
| |
| self._log_to_console(logging.ERROR, message, artifact_id) |
| |
| def add_warning(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| """Log warning messages - for LLM""" |
| entry = LogEntry( |
| level="WARNING", |
| message=message, |
| timestamp=datetime.now().isoformat(), |
| artifact_id=artifact_id, |
| extra_data=kwargs |
| ) |
| self.log_warn.append(entry) |
| |
| if artifact_id: |
| self.artifact_warnings.setdefault(artifact_id, []).append(message) |
| |
| self._log_to_console(logging.WARNING, message, artifact_id) |
| |
| def info_for_llm(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| """Log detailed information - for LLM""" |
| entry = LogEntry( |
| level="INFO_LLM", |
| message=message, |
| timestamp=datetime.now().isoformat(), |
| artifact_id=artifact_id, |
| extra_data=kwargs |
| ) |
| self.log_info_llm.append(entry) |
| self._log_to_console(logging.INFO, f"[{self.INFO_LLM}] {message}", artifact_id) |
| |
| def info_for_user(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| """Log general information - for users""" |
| entry = LogEntry( |
| level="INFO_USER", |
| message=message, |
| timestamp=datetime.now().isoformat(), |
| artifact_id=artifact_id, |
| extra_data=kwargs |
| ) |
| self.log_info_user.append(entry) |
| self._log_to_console(logging.INFO, f"[{self.INFO_USER}] {message}", artifact_id) |
| |
| def debug_for_dev(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| """Log debug information - for developers""" |
| entry = LogEntry( |
| level=self.DEBUG, |
| message=message, |
| timestamp=datetime.now().isoformat(), |
| artifact_id=artifact_id, |
| extra_data=kwargs |
| ) |
| self.log_debug.append(entry) |
| self._log_to_console(logging.DEBUG, f"[{self.DEBUG}] {message}", artifact_id) |
| |
| def get_logs_by_level( |
| self, |
| level:str , |
| compress_log: bool=False, |
| ) -> Dict[str,Any]: |
| self.all_logs = { |
| self.ERROR: self.log_error, |
| self.DEBUG: self.log_debug, |
| self.WARNING: self.log_warn, |
| self.INFO_LLM: self.log_info_llm, |
| self.INFO_USER: self.log_info_user |
| } |
|
|
| selected_log = self.all_logs[level] |
|
|
| return self._extract_log(selected_log) |
| |
| def _extract_log( |
| self, |
| log_content: List[LogEntry], |
| ) -> Dict[str,Any]: |
| """ |
| Extract log content into string format. |
| |
| Args: |
| log_content: List of log entries |
| |
| Returns: |
| Formatted log string with each log entry on a separate line |
| """ |
| if not log_content: |
| return {} |
| |
| log_lines: List[str] = [] |
| extra_data_list: List[Dict[str,Any]] = [] |
| for entry in log_content: |
| log_line = f"[{entry.timestamp}] {entry.message}" |
|
|
| if entry.artifact_id: |
| log_line += f" [artifact_id: {entry.artifact_id}]" |
| |
| log_lines.append(log_line) |
| extra_data_list.append(entry.extra_data) |
| result: Dict[str,Any] = { |
| "log_lines": "\n".join(log_lines), |
| "extra_data_list": extra_data_list |
| } |
| return result |
| |
| def _get_preview_urls( |
| self, |
| extra_data_list: List[Dict[str,Any]], |
| ) -> List[str]: |
| preview_urls: List[str] = [] |
| for extra_data in extra_data_list: |
| preview_urls.extend([str(url) for url in extra_data.get('preview_urls', [])]) |
| return preview_urls |
|
|
| def get_summary( |
| self, |
| artifact_id: str, |
| compress_log: bool=True, |
| **kwargs: Dict[str,Any], |
| ) -> Dict[str,Any]: |
| summary: Dict[str,Any] = {} |
| preview_urls: List[str] = [] |
| if self.summary_levels is None: |
| return summary |
|
|
| for level in self.summary_levels: |
| summary_log = self.get_logs_by_level(level, compress_log) |
| log_lines = summary_log.get('log_lines', "") |
| extra_data_list = summary_log.get('extra_data_list', []) |
| preview_urls.extend(self._get_preview_urls(extra_data_list)) |
| summary[level] = log_lines |
| |
| summary['preview_urls'] = preview_urls |
| summary['artifact_id'] = artifact_id |
| return summary |
|
|
| def clear(self): |
| """Clear all logs""" |
| self.log_error.clear() |
| self.log_warn.clear() |
| self.log_info_llm.clear() |
| self.log_info_user.clear() |
| self.log_debug.clear() |
| self.artifact_warnings.clear() |
| self.artifact_errors.clear() |