Spaces:
Running
Running
| import functools | |
| from typing import Any, Callable | |
| from pathlib import Path | |
| from .logger import get_logger | |
| from .exceptions import FragmentaError, map_common_exception | |
| from .api_responses import APIResponse | |
| logger = get_logger(__name__) | |
| def enhanced_print(message: str, component: str = "Legacy", level: str = "info"): | |
| comp_logger = get_logger(component) | |
| print(message) | |
| log_method = getattr(comp_logger, level.lower(), comp_logger.info) | |
| log_method(f"[MIGRATED] {message}") | |
| def migrate_exception_handling(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except FragmentaError: | |
| raise | |
| except Exception as e: | |
| fragmenta_error = map_common_exception(e, context=func.__name__) | |
| logger.error(f"Mapped exception in {func.__name__}: {e} -> {fragmenta_error}") | |
| raise fragmenta_error | |
| return wrapper | |
| def enhance_api_endpoint(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| try: | |
| result = func(*args, **kwargs) | |
| if isinstance(result, tuple) and len(result) == 2: | |
| response_data, status_code = result | |
| if status_code >= 400: | |
| return APIResponse.error(response_data, status_code), status_code | |
| else: | |
| return APIResponse.success(response_data), status_code | |
| if isinstance(result, dict) and 'error' in result: | |
| return APIResponse.error(result['error'], 400), 400 | |
| return APIResponse.success(result), 200 | |
| except FragmentaError as e: | |
| error_response = APIResponse.error(e) | |
| return error_response, error_response['error']['code'] | |
| except Exception as e: | |
| logger.exception(f"Unexpected error in {func.__name__}") | |
| error_response = APIResponse.error( | |
| "Internal server error", | |
| status_code=500, | |
| details={"function": func.__name__} | |
| ) | |
| return error_response, 500 | |
| return wrapper | |
| def log_performance_metrics(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| import time | |
| func_logger = get_logger(func.__module__ or "Unknown") | |
| start_time = time.time() | |
| func_logger.debug(f"Starting {func.__name__}") | |
| try: | |
| result = func(*args, **kwargs) | |
| elapsed = time.time() - start_time | |
| func_logger.info(f"{func.__name__} completed in {elapsed:.2f}s") | |
| return result | |
| except Exception as e: | |
| elapsed = time.time() - start_time | |
| func_logger.error(f"{func.__name__} failed after {elapsed:.2f}s: {e}") | |
| raise | |
| return wrapper | |
| def safe_file_operation(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| func_logger = get_logger(func.__module__ or "FileOps") | |
| try: | |
| return func(*args, **kwargs) | |
| except FileNotFoundError as e: | |
| func_logger.error(f"File not found in {func.__name__}: {e}") | |
| raise FragmentaError(f"File not found: {e}") | |
| except PermissionError as e: | |
| func_logger.error(f"Permission denied in {func.__name__}: {e}") | |
| raise FragmentaError(f"Permission denied: {e}") | |
| except OSError as e: | |
| func_logger.error(f"OS error in {func.__name__}: {e}") | |
| raise FragmentaError(f"File system error: {e}") | |
| except Exception as e: | |
| func_logger.error(f"Unexpected error in file operation {func.__name__}: {e}") | |
| raise | |
| return wrapper | |
| def migrate_print_statements_example(): | |
| enhanced_print("Starting Fragmenta...", "DesktopApp", "info") | |
| enhanced_print("Backend server starting on http://127.0.0.1:5001", "DesktopApp", "info") | |
| app_logger = get_logger("DesktopApp") | |
| app_logger.info("Starting Fragmenta...") | |
| app_logger.info(" Backend server starting on http://127.0.0.1:5001") | |
| def migrate_exception_handling_example(): | |
| def some_operation(): | |
| raise FileNotFoundError("Model file not found") | |
| def find_print_statements(directory: Path): | |
| print_statements = [] | |
| for py_file in directory.rglob("*.py"): | |
| try: | |
| with open(py_file, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| for line_num, line in enumerate(lines, 1): | |
| if 'print(' in line and not line.strip().startswith('#'): | |
| print_statements.append({ | |
| 'file': py_file, | |
| 'line': line_num, | |
| 'content': line.strip() | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Could not scan {py_file}: {e}") | |
| return print_statements | |
| def create_migration_report(project_root: Path): | |
| logger.info("Creating migration report...") | |
| print_statements = find_print_statements(project_root) | |
| report = { | |
| 'print_statements': len(print_statements), | |
| 'files_with_prints': len(set(ps['file'] for ps in print_statements)), | |
| 'sample_prints': print_statements[:10], | |
| } | |
| logger.info(f"Migration Report:") | |
| logger.info(f" - Print statements found: {report['print_statements']}") | |
| logger.info(f" - Files with prints: {report['files_with_prints']}") | |
| return report |