| | """ |
| | API helper utilities for reliable data fetching with retry logic |
| | """ |
| | import time |
| | import logging |
| | import functools |
| | import numpy as np |
| | from typing import Any, Dict, Optional, Callable, TypeVar, cast, Union |
| | import pandas as pd |
| | import requests |
| | from tenacity import ( |
| | retry, |
| | stop_after_attempt, |
| | wait_exponential, |
| | retry_if_exception_type, |
| | RetryError |
| | ) |
| |
|
| | |
| | logger = logging.getLogger("api_helpers") |
| |
|
| | |
| | T = TypeVar('T') |
| |
|
| | def validate_dataframe(df: pd.DataFrame, required_columns: list, min_rows: int = 1) -> bool: |
| | """ |
| | Validate that a pandas DataFrame meets minimum requirements |
| | |
| | Args: |
| | df: DataFrame to validate |
| | required_columns: List of column names that must be present |
| | min_rows: Minimum number of rows required |
| | |
| | Returns: |
| | True if valid, False otherwise |
| | """ |
| | |
| | if df is None or df.empty or len(df) < min_rows: |
| | logger.warning(f"DataFrame validation failed: empty or too few rows (expected {min_rows}, got {0 if df is None or df.empty else len(df)})") |
| | return False |
| | |
| | |
| | missing_columns = [col for col in required_columns if col not in df.columns] |
| | if missing_columns: |
| | logger.warning(f"DataFrame validation failed: missing columns {missing_columns}") |
| | return False |
| | |
| | return True |
| |
|
| | def convert_numpy_types(obj: Any) -> Any: |
| | """ |
| | Convert numpy types to native Python types for JSON serialization |
| | |
| | Args: |
| | obj: Object that might contain numpy types |
| | |
| | Returns: |
| | Object with numpy types converted to Python types |
| | """ |
| | if isinstance(obj, np.integer): |
| | return int(obj) |
| | elif isinstance(obj, np.floating): |
| | return float(obj) |
| | elif isinstance(obj, np.ndarray): |
| | return obj.tolist() |
| | elif isinstance(obj, pd.DataFrame): |
| | return obj.to_dict(orient='records') |
| | elif isinstance(obj, pd.Series): |
| | return obj.to_dict() |
| | elif isinstance(obj, dict): |
| | return {k: convert_numpy_types(v) for k, v in obj.items()} |
| | elif isinstance(obj, list): |
| | return [convert_numpy_types(item) for item in obj] |
| | else: |
| | return obj |
| |
|
| | def safe_api_call( |
| | func: Callable[..., T], |
| | max_retries: int = 3, |
| | backoff_factor: float = 2.0, |
| | timeout: int = 30, |
| | expected_exceptions: tuple = (requests.exceptions.RequestException,), |
| | validation_func: Optional[Callable[[T], bool]] = None |
| | ) -> Callable[..., Dict[str, Any]]: |
| | """ |
| | Decorator for safely making API calls with retries and error handling |
| | |
| | Args: |
| | func: Function to wrap |
| | max_retries: Maximum number of retry attempts |
| | backoff_factor: Exponential backoff factor |
| | timeout: Request timeout in seconds |
| | expected_exceptions: Exceptions to retry on |
| | validation_func: Optional function to validate the response |
| | |
| | Returns: |
| | Wrapped function that returns a dict with either data or error |
| | """ |
| | @functools.wraps(func) |
| | def wrapper(*args: Any, **kwargs: Any) -> Dict[str, Any]: |
| | """ |
| | Wrapper function that adds retry logic and error handling |
| | |
| | Returns: |
| | Dictionary with either successful data or error information |
| | """ |
| | try: |
| | |
| | if 'timeout' in kwargs: |
| | |
| | if kwargs['timeout'] is None: |
| | kwargs['timeout'] = timeout |
| | |
| | |
| | retried_func = retry( |
| | stop=stop_after_attempt(max_retries), |
| | wait=wait_exponential(multiplier=1, min=backoff_factor, max=backoff_factor * 10), |
| | retry=retry_if_exception_type(expected_exceptions), |
| | reraise=True |
| | )(func) |
| | |
| | |
| | result = retried_func(*args, **kwargs) |
| | |
| | |
| | if validation_func and not validation_func(result): |
| | return { |
| | "success": False, |
| | "error": "Data validation failed", |
| | "data": None |
| | } |
| | |
| | |
| | result = convert_numpy_types(result) |
| | |
| | return { |
| | "success": True, |
| | "data": result, |
| | "error": None |
| | } |
| | |
| | except RetryError as e: |
| | |
| | original_error = e.__cause__ |
| | logger.error(f"Max retries exceeded in {func.__name__}: {str(original_error)}") |
| | return { |
| | "success": False, |
| | "error": f"Max retries exceeded: {str(original_error)}", |
| | "data": None |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error in {func.__name__}: {str(e)}", exc_info=True) |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "data": None |
| | } |
| | |
| | return wrapper |
| |
|
| | def with_exponential_backoff( |
| | max_retries: int = 3, |
| | backoff_factor: float = 2.0, |
| | expected_exceptions: tuple = (Exception,) |
| | ) -> Callable[[Callable[..., T]], Callable[..., T]]: |
| | """ |
| | Decorator for adding exponential backoff retry logic to any function |
| | |
| | Args: |
| | max_retries: Maximum number of retry attempts |
| | backoff_factor: Exponential backoff factor |
| | expected_exceptions: Exceptions to retry on |
| | |
| | Returns: |
| | Decorator function |
| | """ |
| | def decorator(func: Callable[..., T]) -> Callable[..., T]: |
| | @functools.wraps(func) |
| | def wrapper(*args: Any, **kwargs: Any) -> T: |
| | """ |
| | Wrapper function that adds retry logic |
| | |
| | Returns: |
| | Result of the original function |
| | """ |
| | for attempt in range(max_retries): |
| | try: |
| | return func(*args, **kwargs) |
| | except expected_exceptions as e: |
| | if attempt == max_retries - 1: |
| | |
| | raise |
| | |
| | |
| | wait_time = backoff_factor ** attempt |
| | logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}. Retrying in {wait_time:.1f} seconds...") |
| | time.sleep(wait_time) |
| | |
| | |
| | return cast(T, None) |
| | |
| | return wrapper |
| | |
| | return decorator |
| |
|
| | def handle_api_result( |
| | result: Dict[str, Any], |
| | default_value: T, |
| | error_prefix: str = "API Error" |
| | ) -> Union[T, Dict[str, Any]]: |
| | """ |
| | Handle the result from a safe_api_call wrapped function |
| | |
| | Args: |
| | result: The result dictionary from safe_api_call |
| | default_value: Default value to return if the API call failed |
| | error_prefix: Prefix for error message |
| | |
| | Returns: |
| | Either the successful data or an error dictionary |
| | """ |
| | if result.get("success", False): |
| | return result.get("data", default_value) |
| | else: |
| | error_msg = f"{error_prefix}: {result.get('error', 'Unknown error')}" |
| | logger.error(error_msg) |
| | return { |
| | "error": error_msg, |
| | "data": default_value |
| | } |