gauravlochab
chore: change the system from loading to adding the csv for solving the rate limiter error
175e92c
""" | |
CSV Loading Functions for Hugging Face Space Deployment | |
This module provides functions to load preprocessed data from CSV files | |
instead of making API calls, which helps avoid rate limiting issues. | |
""" | |
import pandas as pd | |
import logging | |
from datetime import datetime | |
from typing import Tuple, Optional | |
import os | |
logger = logging.getLogger(__name__) | |
def load_apr_data_from_csv() -> Tuple[pd.DataFrame, Optional[str]]: | |
""" | |
Load APR data from CSV files. | |
Returns: | |
Tuple of (DataFrame, csv_file_path) or (empty DataFrame, None) if files don't exist | |
""" | |
csv_file = "optimus_apr_values.csv" | |
try: | |
if not os.path.exists(csv_file): | |
logger.warning(f"APR CSV file not found: {csv_file}") | |
return pd.DataFrame(), None | |
# Load the CSV file | |
df = pd.read_csv(csv_file) | |
# Convert timestamp column back to datetime | |
if 'timestamp' in df.columns: | |
df['timestamp'] = pd.to_datetime(df['timestamp']) | |
# Ensure proper data types | |
if 'apr' in df.columns: | |
df['apr'] = df['apr'].astype(float) | |
if 'adjusted_apr' in df.columns: | |
df['adjusted_apr'] = pd.to_numeric(df['adjusted_apr'], errors='coerce') | |
if 'agent_id' in df.columns: | |
df['agent_id'] = df['agent_id'].astype(str) | |
if 'is_dummy' in df.columns: | |
df['is_dummy'] = df['is_dummy'].astype(bool) | |
logger.info(f"Successfully loaded {len(df)} APR records from {csv_file}") | |
# Log data freshness | |
if not df.empty and 'timestamp' in df.columns: | |
latest_timestamp = df['timestamp'].max() | |
oldest_timestamp = df['timestamp'].min() | |
logger.info(f"APR data range: {oldest_timestamp} to {latest_timestamp}") | |
# Check how fresh the data is | |
now = datetime.now() | |
if latest_timestamp.tzinfo is None: | |
# Make timezone-naive for comparison | |
now = now.replace(tzinfo=None) | |
hours_old = (now - latest_timestamp).total_seconds() / 3600 | |
logger.info(f"Latest APR data is {hours_old:.1f} hours old") | |
return df, csv_file | |
except Exception as e: | |
logger.error(f"Error loading APR data from CSV: {e}") | |
return pd.DataFrame(), None | |
def load_roi_data_from_csv() -> Tuple[pd.DataFrame, Optional[str]]: | |
""" | |
Load ROI data from CSV files. | |
Returns: | |
Tuple of (DataFrame, csv_file_path) or (empty DataFrame, None) if files don't exist | |
""" | |
csv_file = "optimus_roi_values.csv" | |
try: | |
if not os.path.exists(csv_file): | |
logger.warning(f"ROI CSV file not found: {csv_file}") | |
return pd.DataFrame(), None | |
# Load the CSV file | |
df = pd.read_csv(csv_file) | |
# Convert timestamp column back to datetime | |
if 'timestamp' in df.columns: | |
df['timestamp'] = pd.to_datetime(df['timestamp']) | |
# Ensure proper data types | |
if 'roi' in df.columns: | |
df['roi'] = df['roi'].astype(float) | |
if 'agent_id' in df.columns: | |
df['agent_id'] = df['agent_id'].astype(str) | |
if 'is_dummy' in df.columns: | |
df['is_dummy'] = df['is_dummy'].astype(bool) | |
logger.info(f"Successfully loaded {len(df)} ROI records from {csv_file}") | |
# Log data freshness | |
if not df.empty and 'timestamp' in df.columns: | |
latest_timestamp = df['timestamp'].max() | |
oldest_timestamp = df['timestamp'].min() | |
logger.info(f"ROI data range: {oldest_timestamp} to {latest_timestamp}") | |
# Check how fresh the data is | |
now = datetime.now() | |
if latest_timestamp.tzinfo is None: | |
# Make timezone-naive for comparison | |
now = now.replace(tzinfo=None) | |
hours_old = (now - latest_timestamp).total_seconds() / 3600 | |
logger.info(f"Latest ROI data is {hours_old:.1f} hours old") | |
return df, csv_file | |
except Exception as e: | |
logger.error(f"Error loading ROI data from CSV: {e}") | |
return pd.DataFrame(), None | |
def load_statistics_from_csv() -> pd.DataFrame: | |
""" | |
Load statistics data from CSV file. | |
Returns: | |
DataFrame with statistics or empty DataFrame if file doesn't exist | |
""" | |
csv_file = "optimus_apr_statistics.csv" | |
try: | |
if not os.path.exists(csv_file): | |
logger.warning(f"Statistics CSV file not found: {csv_file}") | |
return pd.DataFrame() | |
# Load the CSV file | |
df = pd.read_csv(csv_file) | |
logger.info(f"Successfully loaded statistics from {csv_file}") | |
return df | |
except Exception as e: | |
logger.error(f"Error loading statistics from CSV: {e}") | |
return pd.DataFrame() | |
def check_csv_data_availability() -> dict: | |
""" | |
Check which CSV files are available and their basic info. | |
Returns: | |
Dictionary with availability status and file info | |
""" | |
files_info = {} | |
# Check APR data | |
apr_file = "optimus_apr_values.csv" | |
if os.path.exists(apr_file): | |
try: | |
df = pd.read_csv(apr_file) | |
files_info['apr'] = { | |
'available': True, | |
'file': apr_file, | |
'records': len(df), | |
'size_mb': os.path.getsize(apr_file) / (1024 * 1024), | |
'modified': datetime.fromtimestamp(os.path.getmtime(apr_file)) | |
} | |
except Exception as e: | |
files_info['apr'] = {'available': False, 'error': str(e)} | |
else: | |
files_info['apr'] = {'available': False, 'error': 'File not found'} | |
# Check ROI data | |
roi_file = "optimus_roi_values.csv" | |
if os.path.exists(roi_file): | |
try: | |
df = pd.read_csv(roi_file) | |
files_info['roi'] = { | |
'available': True, | |
'file': roi_file, | |
'records': len(df), | |
'size_mb': os.path.getsize(roi_file) / (1024 * 1024), | |
'modified': datetime.fromtimestamp(os.path.getmtime(roi_file)) | |
} | |
except Exception as e: | |
files_info['roi'] = {'available': False, 'error': str(e)} | |
else: | |
files_info['roi'] = {'available': False, 'error': 'File not found'} | |
# Check statistics data | |
stats_file = "optimus_apr_statistics.csv" | |
if os.path.exists(stats_file): | |
try: | |
df = pd.read_csv(stats_file) | |
files_info['statistics'] = { | |
'available': True, | |
'file': stats_file, | |
'records': len(df), | |
'size_mb': os.path.getsize(stats_file) / (1024 * 1024), | |
'modified': datetime.fromtimestamp(os.path.getmtime(stats_file)) | |
} | |
except Exception as e: | |
files_info['statistics'] = {'available': False, 'error': str(e)} | |
else: | |
files_info['statistics'] = {'available': False, 'error': 'File not found'} | |
return files_info | |
def get_data_freshness_info() -> dict: | |
""" | |
Get information about how fresh the CSV data is. | |
Returns: | |
Dictionary with freshness information | |
""" | |
info = {} | |
try: | |
# Check APR data freshness | |
apr_df, _ = load_apr_data_from_csv() | |
if not apr_df.empty and 'timestamp' in apr_df.columns: | |
latest_apr = apr_df['timestamp'].max() | |
now = datetime.now() | |
if latest_apr.tzinfo is None: | |
now = now.replace(tzinfo=None) | |
hours_old = (now - latest_apr).total_seconds() / 3600 | |
info['apr'] = { | |
'latest_data': latest_apr, | |
'hours_old': hours_old, | |
'is_fresh': hours_old < 24 # Consider fresh if less than 24 hours old | |
} | |
# Check ROI data freshness | |
roi_df, _ = load_roi_data_from_csv() | |
if not roi_df.empty and 'timestamp' in roi_df.columns: | |
latest_roi = roi_df['timestamp'].max() | |
now = datetime.now() | |
if latest_roi.tzinfo is None: | |
now = now.replace(tzinfo=None) | |
hours_old = (now - latest_roi).total_seconds() / 3600 | |
info['roi'] = { | |
'latest_data': latest_roi, | |
'hours_old': hours_old, | |
'is_fresh': hours_old < 24 # Consider fresh if less than 24 hours old | |
} | |
except Exception as e: | |
logger.error(f"Error checking data freshness: {e}") | |
info['error'] = str(e) | |
return info | |