Spaces:
Running
Running
""" | |
Data processing utilities for the Modius Agent Performance application. | |
""" | |
import json | |
import pandas as pd | |
import logging | |
import os | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Any, Optional, Tuple | |
from ..config.constants import DATA_CONFIG, DATE_RANGES, CHART_CONFIG | |
from .models import AgentMetric, AgentStatistics | |
from .api_client import get_api_client, get_agent_name | |
logger = logging.getLogger(__name__) | |
class DataProcessor: | |
"""Handles data processing and transformation for agent metrics.""" | |
def __init__(self): | |
self.api_client = get_api_client() | |
def extract_apr_value(self, attr: Dict[str, Any]) -> Dict[str, Any]: | |
"""Extract APR value, adjusted APR value, ROI value, volume, and timestamp from JSON value.""" | |
try: | |
agent_id = attr.get("agent_id", "unknown") | |
logger.debug(f"Extracting APR value for agent {agent_id}") | |
# The APR value is stored in the json_value field | |
if attr["json_value"] is None: | |
logger.debug(f"Agent {agent_id}: json_value is None") | |
return { | |
"apr": None, "adjusted_apr": None, "roi": None, | |
"volume": None, "timestamp": None, "agent_id": agent_id, | |
"agent_hash": None, "is_dummy": False | |
} | |
# If json_value is a string, parse it | |
if isinstance(attr["json_value"], str): | |
logger.debug(f"Agent {agent_id}: json_value is string, parsing") | |
json_data = json.loads(attr["json_value"]) | |
else: | |
json_data = attr["json_value"] | |
apr = json_data.get("apr") | |
adjusted_apr = json_data.get("adjusted_apr") | |
timestamp = json_data.get("timestamp") | |
volume = json_data.get("volume") | |
# ROI extraction from calculation_metrics.f_i_ratio | |
roi = None | |
if "calculation_metrics" in json_data and json_data["calculation_metrics"] is not None: | |
if isinstance(json_data["calculation_metrics"], dict): | |
f_i_ratio = json_data["calculation_metrics"].get("f_i_ratio") | |
if f_i_ratio is not None: | |
roi = f_i_ratio | |
logger.debug(f"Agent {agent_id}: ROI extracted from f_i_ratio: {roi}") | |
else: | |
logger.debug(f"Agent {agent_id}: No f_i_ratio found in calculation_metrics") | |
else: | |
logger.debug(f"Agent {agent_id}: calculation_metrics is not a dict: {type(json_data['calculation_metrics'])}") | |
else: | |
logger.debug(f"Agent {agent_id}: No calculation_metrics found") | |
# Try to extract volume from portfolio_snapshot if it's not directly in json_data | |
if volume is None and "portfolio_snapshot" in json_data and json_data["portfolio_snapshot"] is not None: | |
portfolio = json_data["portfolio_snapshot"].get("portfolio") | |
if portfolio and isinstance(portfolio, dict): | |
volume = portfolio.get("volume") | |
# Extract agent_hash from json_data or portfolio_snapshot | |
agent_hash = json_data.get("agent_hash") | |
if agent_hash is None and "portfolio_snapshot" in json_data and json_data["portfolio_snapshot"] is not None: | |
portfolio = json_data["portfolio_snapshot"].get("portfolio") | |
if portfolio and isinstance(portfolio, dict): | |
agent_hash = portfolio.get("agent_hash") | |
logger.debug(f"Agent {agent_id}: Raw values - APR: {apr}, adjusted APR: {adjusted_apr}, ROI: {roi}, volume: {volume}, timestamp: {timestamp}, agent_hash: {agent_hash}") | |
# Convert timestamp to datetime if it exists | |
timestamp_dt = None | |
if timestamp: | |
try: | |
timestamp_dt = datetime.fromtimestamp(timestamp) | |
except (ValueError, TypeError, OSError) as e: | |
logger.warning(f"Invalid timestamp {timestamp} for agent {agent_id}: {e}") | |
timestamp_dt = None | |
result = { | |
"apr": apr, | |
"adjusted_apr": adjusted_apr, | |
"roi": roi, | |
"volume": volume, | |
"timestamp": timestamp_dt, | |
"agent_id": agent_id, | |
"agent_hash": agent_hash, | |
"is_dummy": False | |
} | |
logger.debug(f"Agent {agent_id}: Extracted result: {result}") | |
return result | |
except (json.JSONDecodeError, KeyError, TypeError) as e: | |
logger.error(f"Error parsing JSON value: {e} for agent_id: {attr.get('agent_id')}") | |
logger.error(f"Problematic json_value: {attr.get('json_value')}") | |
return { | |
"apr": None, "adjusted_apr": None, "roi": None, | |
"volume": None, "timestamp": None, "agent_id": attr.get('agent_id'), | |
"agent_hash": None, "is_dummy": False | |
} | |
def load_corrected_data_from_csv(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Load corrected APR and ROI data from CSV file.""" | |
logger.info("==== Loading corrected data from CSV ====") | |
try: | |
# Get the CSV file path | |
csv_filename = CHART_CONFIG.get('corrected_data_file', 'corrected_apr_roi_data.csv') | |
csv_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), csv_filename) | |
if not os.path.exists(csv_path): | |
logger.error(f"Corrected data file not found: {csv_path}") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
# Read the CSV file | |
df = pd.read_csv(csv_path) | |
logger.info(f"Loaded {len(df)} rows from {csv_path}") | |
# Parse dates and convert to datetime | |
df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d') | |
# Create APR DataFrame | |
apr_data_list = [] | |
roi_data_list = [] | |
for _, row in df.iterrows(): | |
timestamp = row['Date'] | |
apr_value = row['APR'] | |
roi_value = row['ROI'] | |
adjusted_apr_value = row['Adjusted_APR'] if 'Adjusted_APR' in df.columns else None | |
# Create APR entry | |
apr_entry = { | |
'apr': apr_value, | |
'adjusted_apr': adjusted_apr_value, # Now available in corrected data | |
'roi': None, | |
'volume': None, | |
'timestamp': timestamp, | |
'agent_id': 'corrected_data', | |
'agent_name': 'Corrected Manual Data', | |
'agent_hash': None, | |
'is_dummy': False, | |
'metric_type': 'APR' | |
} | |
apr_data_list.append(apr_entry) | |
# Create ROI entry | |
roi_entry = { | |
'roi': roi_value, | |
'timestamp': timestamp, | |
'agent_id': 'corrected_data', | |
'agent_name': 'Corrected Manual Data', | |
'is_dummy': False, | |
'metric_type': 'ROI' | |
} | |
roi_data_list.append(roi_entry) | |
# Convert to DataFrames | |
apr_df = pd.DataFrame(apr_data_list) | |
roi_df = pd.DataFrame(roi_data_list) | |
logger.info(f"Created APR DataFrame with {len(apr_df)} rows (using all available dates)") | |
logger.info(f"Created ROI DataFrame with {len(roi_df)} rows (using all available dates)") | |
if not apr_df.empty: | |
logger.info(f"APR date range: {apr_df['timestamp'].min()} to {apr_df['timestamp'].max()}") | |
logger.info(f"APR statistics: min={apr_df['apr'].min():.2f}, max={apr_df['apr'].max():.2f}, mean={apr_df['apr'].mean():.2f}") | |
if not roi_df.empty: | |
logger.info(f"ROI date range: {roi_df['timestamp'].min()} to {roi_df['timestamp'].max()}") | |
logger.info(f"ROI statistics: min={roi_df['roi'].min():.2f}, max={roi_df['roi'].max():.2f}, mean={roi_df['roi'].mean():.2f}") | |
return apr_df, roi_df | |
except Exception as e: | |
logger.error(f"Error loading corrected data from CSV: {e}") | |
logger.exception("Exception traceback:") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
def fetch_apr_data_from_db(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Fetch APR data from database using the API.""" | |
# Check if we should use corrected data instead | |
if CHART_CONFIG.get('use_corrected_data', False): | |
logger.info("Using corrected CSV data instead of database") | |
return self.load_corrected_data_from_csv() | |
logger.info("==== Starting APR data fetch ====") | |
try: | |
# Step 1: Find the Modius agent type | |
logger.info("Finding Modius agent type") | |
modius_type_response = self.api_client.get_agent_type_by_name(DATA_CONFIG["agent_type_name"]) | |
if not modius_type_response.is_success(): | |
logger.error("Modius agent type not found, using placeholder data") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
modius_type = modius_type_response.get_agent_type() | |
type_id = modius_type.type_id | |
logger.info(f"Found Modius agent type with ID: {type_id}") | |
# Step 2: Find the APR attribute definition | |
logger.info("Finding APR attribute definition") | |
apr_attr_response = self.api_client.get_attribute_definition_by_name(DATA_CONFIG["attribute_name"]) | |
if not apr_attr_response.is_success(): | |
logger.error("APR attribute definition not found, using placeholder data") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
apr_attr_def = apr_attr_response.get_attribute_definition() | |
attr_def_id = apr_attr_def.attr_def_id | |
logger.info(f"Found APR attribute definition with ID: {attr_def_id}") | |
# Step 3: Get all agents of type Modius | |
logger.info(f"Getting all agents of type Modius (type_id: {type_id})") | |
agents_response = self.api_client.get_agents_by_type(type_id) | |
if not agents_response.is_success(): | |
logger.error("No agents of type 'Modius' found") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
modius_agents_info = agents_response.get_agents() | |
modius_agents = [ | |
{ | |
"agent_id": agent.agent_id, | |
"agent_name": agent.agent_name, | |
"type_id": agent.type_id | |
} | |
for agent in modius_agents_info | |
] | |
# Filter out excluded agents | |
excluded_agents = DATA_CONFIG.get("excluded_agents", []) | |
if excluded_agents: | |
original_count = len(modius_agents) | |
modius_agents = [agent for agent in modius_agents if agent["agent_id"] not in excluded_agents] | |
filtered_count = original_count - len(modius_agents) | |
logger.info(f"Filtered out {filtered_count} excluded agents: {excluded_agents}") | |
logger.info(f"Found {len(modius_agents)} Modius agents after filtering") | |
logger.debug(f"Modius agents: {[{'agent_id': a['agent_id'], 'agent_name': a['agent_name']} for a in modius_agents]}") | |
# Step 4: Fetch all APR values for Modius agents | |
logger.info(f"Fetching APR values for all Modius agents (attr_def_id: {attr_def_id})") | |
apr_attributes = self.api_client.get_attribute_values_by_type_and_attr(modius_agents, attr_def_id) | |
if not apr_attributes: | |
logger.error("No APR values found for 'Modius' agents") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
logger.info(f"Found {len(apr_attributes)} APR attributes total") | |
# Step 5: Extract APR and ROI data | |
logger.info("Extracting APR and ROI data from attributes") | |
apr_data_list = [] | |
roi_data_list = [] | |
for attr in apr_attributes: | |
data = self.extract_apr_value(attr) | |
if data["timestamp"] is not None: | |
# Get agent name | |
agent_name = get_agent_name(attr["agent_id"], modius_agents) | |
data["agent_name"] = agent_name | |
data["is_dummy"] = False | |
# Process APR data | |
if data["apr"] is not None: | |
# Filter APR values based on exclusion list | |
if data["apr"] not in DATA_CONFIG.get("exclude_apr_values", []): | |
apr_entry = data.copy() | |
apr_entry["metric_type"] = "APR" | |
logger.debug(f"Agent {agent_name} ({attr['agent_id']}): APR value: {data['apr']}") | |
apr_data_list.append(apr_entry) | |
else: | |
logger.debug(f"Skipping APR value for agent {agent_name}: {data['apr']} (excluded value)") | |
# Process ROI data | |
if data["roi"] is not None: | |
# Filter ROI values if needed (similar to APR filtering) | |
roi_entry = { | |
"roi": data["roi"], | |
"timestamp": data["timestamp"], | |
"agent_id": data["agent_id"], | |
"agent_name": agent_name, | |
"is_dummy": False, | |
"metric_type": "ROI" | |
} | |
logger.debug(f"Agent {agent_name} ({attr['agent_id']}): ROI value: {data['roi']}") | |
roi_data_list.append(roi_entry) | |
else: | |
logger.debug(f"Agent {agent_name} ({attr['agent_id']}): No ROI data available") | |
logger.info(f"Extracted {len(apr_data_list)} valid APR data points and {len(roi_data_list)} valid ROI data points") | |
# Convert to DataFrames | |
apr_df = pd.DataFrame(apr_data_list) if apr_data_list else pd.DataFrame([]) | |
roi_df = pd.DataFrame(roi_data_list) if roi_data_list else pd.DataFrame([]) | |
# Log the resulting dataframes | |
logger.info(f"Created APR DataFrame with {len(apr_df)} rows") | |
if not apr_df.empty: | |
logger.info(f"APR DataFrame columns: {apr_df.columns.tolist()}") | |
logger.info(f"APR statistics: min={apr_df['apr'].min()}, max={apr_df['apr'].max()}, mean={apr_df['apr'].mean()}") | |
logger.info(f"Created ROI DataFrame with {len(roi_df)} rows") | |
if not roi_df.empty: | |
logger.info(f"ROI DataFrame columns: {roi_df.columns.tolist()}") | |
logger.info(f"ROI statistics: min={roi_df['roi'].min()}, max={roi_df['roi'].max()}, mean={roi_df['roi'].mean()}") | |
return apr_df, roi_df | |
except Exception as e: | |
logger.error(f"Error fetching APR data: {e}") | |
logger.exception("Exception traceback:") | |
return pd.DataFrame([]), pd.DataFrame([]) | |
def filter_outliers(self, df: pd.DataFrame, column: str, threshold: float = None) -> pd.DataFrame: | |
"""Filter outliers from a DataFrame based on a threshold.""" | |
if df.empty or column not in df.columns: | |
return df | |
# Use configured outlier threshold | |
threshold = threshold or DATA_CONFIG.get("outlier_threshold", 200) | |
# Filter out outliers based on threshold | |
outlier_mask = (df[column] > threshold) | (df[column] < -threshold) | |
filtered_df = df[~outlier_mask] | |
outliers_removed = len(df) - len(filtered_df) | |
if outliers_removed > 0: | |
logger.info(f"Filtered {outliers_removed} outliers from {column} column using threshold ±{threshold}") | |
else: | |
logger.info(f"No outliers found in {column} column using threshold ±{threshold}") | |
return filtered_df | |
def calculate_moving_average(self, df: pd.DataFrame, value_column: str, window_days: int = None) -> pd.DataFrame: | |
"""Calculate moving average for a DataFrame.""" | |
if df.empty: | |
return df | |
window_days = window_days or CHART_CONFIG["moving_average_window_days"] | |
time_window = pd.Timedelta(days=window_days) | |
aggregation_method = CHART_CONFIG.get("timestamp_aggregation_method", "mean") | |
# Sort by timestamp | |
df_sorted = df.sort_values('timestamp') | |
# Group by timestamp and calculate aggregation (mean or median based on config) | |
if aggregation_method == "median": | |
aggregated_data = df_sorted.groupby('timestamp')[value_column].median().reset_index() | |
logger.info(f"Using median aggregation for timestamp grouping") | |
else: | |
aggregated_data = df_sorted.groupby('timestamp')[value_column].mean().reset_index() | |
logger.info(f"Using mean aggregation for timestamp grouping") | |
aggregated_data = aggregated_data.sort_values('timestamp') | |
# Calculate moving average (still using mean for the moving window) | |
aggregated_data['moving_avg'] = None | |
for i, row in aggregated_data.iterrows(): | |
current_time = row['timestamp'] | |
window_start = current_time - time_window | |
# Get all data points within the time window | |
window_data = df_sorted[ | |
(df_sorted['timestamp'] >= window_start) & | |
(df_sorted['timestamp'] <= current_time) | |
] | |
if not window_data.empty: | |
aggregated_data.at[i, 'moving_avg'] = window_data[value_column].mean() | |
else: | |
aggregated_data.at[i, 'moving_avg'] = row[value_column] | |
logger.info(f"Calculated {window_days}-day moving average with {len(aggregated_data)} points using {aggregation_method} timestamp aggregation") | |
return aggregated_data | |
def generate_statistics(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Generate statistics from the data.""" | |
if df.empty: | |
return pd.DataFrame() | |
# Get unique agents | |
unique_agents = df['agent_id'].unique() | |
stats_list = [] | |
# Generate per-agent statistics | |
for agent_id in unique_agents: | |
agent_data = df[df['agent_id'] == agent_id] | |
agent_name = agent_data['agent_name'].iloc[0] | |
# APR statistics | |
apr_data = agent_data[agent_data['metric_type'] == 'APR'] | |
real_apr = apr_data[apr_data['is_dummy'] == False] | |
# Check if adjusted_apr exists and has non-null values | |
has_adjusted_apr = 'adjusted_apr' in apr_data.columns and apr_data['adjusted_apr'].notna().any() | |
stats = { | |
'agent_id': agent_id, | |
'agent_name': agent_name, | |
'total_points': len(agent_data), | |
'apr_points': len(apr_data), | |
'real_apr_points': len(real_apr), | |
'avg_apr': apr_data['apr'].mean() if not apr_data.empty else None, | |
'max_apr': apr_data['apr'].max() if not apr_data.empty else None, | |
'min_apr': apr_data['apr'].min() if not apr_data.empty else None, | |
'avg_adjusted_apr': apr_data['adjusted_apr'].mean() if has_adjusted_apr else None, | |
'max_adjusted_apr': apr_data['adjusted_apr'].max() if has_adjusted_apr else None, | |
'min_adjusted_apr': apr_data['adjusted_apr'].min() if has_adjusted_apr else None, | |
'latest_timestamp': agent_data['timestamp'].max().strftime('%Y-%m-%d %H:%M:%S') if not agent_data.empty and pd.notna(agent_data['timestamp'].max()) else None | |
} | |
stats_list.append(stats) | |
# Generate overall statistics | |
apr_only = df[df['metric_type'] == 'APR'] | |
has_adjusted_apr_overall = 'adjusted_apr' in apr_only.columns and apr_only['adjusted_apr'].notna().any() | |
overall_stats = { | |
'agent_id': 'ALL', | |
'agent_name': 'All Agents', | |
'total_points': len(df), | |
'apr_points': len(apr_only), | |
'real_apr_points': len(apr_only[apr_only['is_dummy'] == False]), | |
'avg_apr': apr_only['apr'].mean() if not apr_only.empty else None, | |
'max_apr': apr_only['apr'].max() if not apr_only.empty else None, | |
'min_apr': apr_only['apr'].min() if not apr_only.empty else None, | |
'avg_adjusted_apr': apr_only['adjusted_apr'].mean() if has_adjusted_apr_overall else None, | |
'max_adjusted_apr': apr_only['adjusted_apr'].max() if has_adjusted_apr_overall else None, | |
'min_adjusted_apr': apr_only['adjusted_apr'].min() if has_adjusted_apr_overall else None, | |
'latest_timestamp': df['timestamp'].max().strftime('%Y-%m-%d %H:%M:%S') if not df.empty and pd.notna(df['timestamp'].max()) else None | |
} | |
stats_list.append(overall_stats) | |
return pd.DataFrame(stats_list) | |
def calculate_daily_volume_changes(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Calculate daily volume totals and cumulative volume with 7-day SMA.""" | |
if df.empty or 'volume' not in df.columns: | |
logger.warning("No volume data available for daily calculation") | |
return pd.DataFrame() | |
# Filter out excluded agents | |
excluded_agents = DATA_CONFIG.get("excluded_agents", []) | |
if excluded_agents: | |
original_count = len(df) | |
df = df[~df['agent_id'].isin(excluded_agents)] | |
filtered_count = original_count - len(df) | |
logger.info(f"Filtered out {filtered_count} records from excluded agents {excluded_agents} for volume calculation") | |
# Filter for records with volume data | |
volume_data = df[df['volume'].notna()].copy() | |
if volume_data.empty: | |
logger.warning("No valid volume data found") | |
return pd.DataFrame() | |
# Ensure volume is float and timestamp is datetime | |
volume_data['volume'] = volume_data['volume'].astype(float) | |
volume_data = volume_data.sort_values('timestamp') | |
logger.info(f"Processing {len(volume_data)} volume records for daily aggregation") | |
# Group by date (day) and sum volume across all agents | |
volume_data['date'] = volume_data['timestamp'].dt.date | |
daily_sums = volume_data.groupby('date')['volume'].sum().reset_index() | |
daily_sums = daily_sums.sort_values('date') | |
# Convert date back to datetime for easier plotting | |
daily_sums['date'] = pd.to_datetime(daily_sums['date']) | |
# Calculate cumulative volume (each day adds to the previous day's total) | |
daily_sums['cumulative_volume'] = daily_sums['volume'].cumsum() | |
# Calculate day-over-day percentage changes for the cumulative volume | |
daily_sums['prev_cumulative'] = daily_sums['cumulative_volume'].shift(1) | |
daily_sums['day_over_day_pct'] = ((daily_sums['cumulative_volume'] / daily_sums['prev_cumulative']) - 1) * 100 | |
# Set first day to 0% change | |
daily_sums.loc[0, 'day_over_day_pct'] = 0.0 | |
# Fill any NaN or infinite values with 0 | |
daily_sums['day_over_day_pct'] = daily_sums['day_over_day_pct'].replace([float('inf'), float('-inf')], 0.0) | |
daily_sums['day_over_day_pct'] = daily_sums['day_over_day_pct'].fillna(0.0) | |
# Calculate 7-day Simple Moving Average (SMA) of percentage changes | |
daily_sums['sma_7d'] = daily_sums['day_over_day_pct'].rolling(window=7, min_periods=1).mean() | |
logger.info(f"Calculated daily changes for {len(daily_sums)} days") | |
logger.info(f"Daily volume range: {daily_sums['volume'].min():.2f} to {daily_sums['volume'].max():.2f}") | |
logger.info(f"Cumulative volume range: {daily_sums['cumulative_volume'].min():.2f} to {daily_sums['cumulative_volume'].max():.2f}") | |
logger.info(f"Day-over-day percentage range: {daily_sums['day_over_day_pct'].min():.1f}% to {daily_sums['day_over_day_pct'].max():.1f}%") | |
logger.info(f"7-day SMA range: {daily_sums['sma_7d'].min():.1f}% to {daily_sums['sma_7d'].max():.1f}%") | |
return daily_sums[['date', 'volume', 'cumulative_volume', 'day_over_day_pct', 'sma_7d']] | |
def calculate_weekly_volume_changes(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Calculate weekly volume averages and week-over-week percentage changes.""" | |
if df.empty or 'volume' not in df.columns: | |
logger.warning("No volume data available for weekly calculation") | |
return pd.DataFrame() | |
# Filter out excluded agents | |
excluded_agents = DATA_CONFIG.get("excluded_agents", []) | |
if excluded_agents: | |
original_count = len(df) | |
df = df[~df['agent_id'].isin(excluded_agents)] | |
filtered_count = original_count - len(df) | |
logger.info(f"Filtered out {filtered_count} records from excluded agents {excluded_agents} for weekly volume calculation") | |
# Filter for records with volume data | |
volume_data = df[df['volume'].notna()].copy() | |
if volume_data.empty: | |
logger.warning("No valid volume data found") | |
return pd.DataFrame() | |
# Ensure volume is float and timestamp is datetime | |
volume_data['volume'] = volume_data['volume'].astype(float) | |
volume_data = volume_data.sort_values('timestamp') | |
logger.info(f"Processing {len(volume_data)} volume records for weekly aggregation") | |
# Group by Monday-starting weeks and sum volume | |
volume_data['week_start'] = volume_data['timestamp'].dt.to_period('W-MON').dt.start_time | |
# Sum volume for each week across all agents | |
weekly_sums = volume_data.groupby('week_start')['volume'].sum().reset_index() | |
weekly_sums = weekly_sums.sort_values('week_start') | |
# Calculate weekly averages (sum ÷ 7) | |
weekly_sums['weekly_avg_volume'] = weekly_sums['volume'] / 7 | |
# Calculate week-over-week percentage changes | |
weekly_sums['prev_week_avg'] = weekly_sums['weekly_avg_volume'].shift(1) | |
weekly_sums['week_over_week_pct'] = ((weekly_sums['weekly_avg_volume'] / weekly_sums['prev_week_avg']) - 1) * 100 | |
# Set first week to 0% change as requested | |
weekly_sums.loc[0, 'week_over_week_pct'] = 0.0 | |
# Fill any NaN values with 0 | |
weekly_sums['week_over_week_pct'] = weekly_sums['week_over_week_pct'].fillna(0.0) | |
logger.info(f"Calculated weekly changes for {len(weekly_sums)} weeks") | |
logger.info(f"Week-over-week percentage range: {weekly_sums['week_over_week_pct'].min():.1f}% to {weekly_sums['week_over_week_pct'].max():.1f}%") | |
return weekly_sums[['week_start', 'weekly_avg_volume', 'week_over_week_pct']] | |
def aggregate_daily_medians(self, df: pd.DataFrame, value_columns: List[str]) -> pd.DataFrame: | |
""" | |
Aggregate multiple values per day for each agent by taking the median. | |
This reduces outliers and provides one clean data point per agent per day. | |
Args: | |
df: DataFrame with agent data | |
value_columns: List of columns to aggregate (e.g., ['apr', 'adjusted_apr'] or ['roi']) | |
Returns: | |
DataFrame with daily median values for each agent | |
""" | |
if df.empty: | |
logger.info("No data to aggregate") | |
return df | |
# Create a copy to avoid modifying the original | |
data = df.copy() | |
# Add date column for grouping | |
data['date'] = data['timestamp'].dt.date | |
# Group by agent and date, then calculate median for each value column | |
grouping_cols = ['agent_id', 'agent_name', 'date'] | |
# Prepare aggregation dictionary | |
agg_dict = {} | |
for col in value_columns: | |
if col in data.columns: | |
agg_dict[col] = 'median' | |
# Add other columns that should be preserved (take first value per group) | |
preserve_cols = ['metric_type', 'is_dummy', 'agent_hash'] | |
for col in preserve_cols: | |
if col in data.columns: | |
agg_dict[col] = 'first' | |
if not agg_dict: | |
logger.warning("No valid columns to aggregate") | |
return pd.DataFrame() | |
# Perform aggregation | |
aggregated = data.groupby(grouping_cols).agg(agg_dict).reset_index() | |
# Convert date back to datetime and set as timestamp | |
aggregated['timestamp'] = pd.to_datetime(aggregated['date']) | |
aggregated = aggregated.drop('date', axis=1) | |
# Sort by agent and timestamp | |
aggregated = aggregated.sort_values(['agent_id', 'timestamp']) | |
# Log aggregation results | |
original_count = len(data) | |
aggregated_count = len(aggregated) | |
reduction_pct = ((original_count - aggregated_count) / original_count * 100) if original_count > 0 else 0 | |
logger.info(f"Daily median aggregation completed:") | |
logger.info(f" Original data points: {original_count}") | |
logger.info(f" Aggregated data points: {aggregated_count}") | |
logger.info(f" Data reduction: {reduction_pct:.1f}%") | |
logger.info(f" Aggregated columns: {list(agg_dict.keys())}") | |
# Log per-agent statistics | |
agent_stats = data.groupby('agent_id').size() | |
aggregated_stats = aggregated.groupby('agent_id').size() | |
for agent_id in agent_stats.index: | |
original = agent_stats[agent_id] | |
final = aggregated_stats.get(agent_id, 0) | |
agent_name = data[data['agent_id'] == agent_id]['agent_name'].iloc[0] | |
logger.debug(f" Agent {agent_name} ({agent_id}): {original} → {final} points") | |
return aggregated | |
def filter_high_apr_values(self, apr_df: pd.DataFrame, roi_df: pd.DataFrame = None) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
""" | |
Filter out APR values above the threshold and corresponding ROI values. | |
Forward fill gaps to maintain continuity in time series. | |
Args: | |
apr_df: DataFrame with APR data | |
roi_df: DataFrame with ROI data (optional) | |
Returns: | |
Tuple of (filtered_apr_df, filtered_roi_df) | |
""" | |
max_apr_threshold = DATA_CONFIG.get("max_apr_threshold", 400) | |
if apr_df.empty: | |
logger.info("No APR data to filter") | |
return apr_df, roi_df if roi_df is not None else pd.DataFrame() | |
# Create copies to avoid modifying originals | |
apr_filtered = apr_df.copy() | |
roi_filtered = roi_df.copy() if roi_df is not None and not roi_df.empty else pd.DataFrame() | |
# Identify high APR values (both regular and adjusted APR) | |
high_apr_mask = pd.Series(False, index=apr_filtered.index) | |
if 'apr' in apr_filtered.columns: | |
high_apr_mask |= (apr_filtered['apr'] > max_apr_threshold) | |
if 'adjusted_apr' in apr_filtered.columns: | |
high_apr_mask |= (apr_filtered['adjusted_apr'] > max_apr_threshold) | |
high_apr_count = high_apr_mask.sum() | |
if high_apr_count == 0: | |
logger.info(f"No APR values above {max_apr_threshold}% threshold found") | |
return apr_filtered, roi_filtered | |
logger.info(f"Found {high_apr_count} APR values above {max_apr_threshold}% threshold") | |
# Get the high APR records for ROI filtering | |
high_apr_records = apr_filtered[high_apr_mask][['agent_id', 'timestamp']].copy() | |
# Remove high APR values | |
apr_filtered = apr_filtered[~high_apr_mask].copy() | |
# Forward fill APR gaps for each agent | |
apr_filled_count = 0 | |
for agent_id in apr_filtered['agent_id'].unique(): | |
agent_mask = apr_filtered['agent_id'] == agent_id | |
agent_data = apr_filtered[agent_mask].sort_values('timestamp') | |
if len(agent_data) > 0: | |
# Forward fill APR values | |
if 'apr' in apr_filtered.columns: | |
filled_apr = agent_data['apr'].ffill() | |
apr_filled_count += (filled_apr != agent_data['apr']).sum() | |
apr_filtered.loc[agent_mask, 'apr'] = filled_apr | |
# Forward fill adjusted APR values | |
if 'adjusted_apr' in apr_filtered.columns: | |
filled_adj_apr = agent_data['adjusted_apr'].ffill() | |
apr_filtered.loc[agent_mask, 'adjusted_apr'] = filled_adj_apr | |
logger.info(f"Forward filled {apr_filled_count} APR gaps") | |
# Filter corresponding ROI values if ROI data is provided | |
if not roi_filtered.empty and not high_apr_records.empty: | |
# Remove ROI values for the same agent-timestamp combinations | |
roi_before_count = len(roi_filtered) | |
# Merge to identify which ROI records to remove | |
roi_to_remove = roi_filtered.merge( | |
high_apr_records, | |
on=['agent_id', 'timestamp'], | |
how='inner' | |
) | |
roi_removed_count = len(roi_to_remove) | |
if roi_removed_count > 0: | |
# Remove the matching ROI records | |
roi_filtered = roi_filtered.merge( | |
high_apr_records, | |
on=['agent_id', 'timestamp'], | |
how='left', | |
indicator=True | |
) | |
roi_filtered = roi_filtered[roi_filtered['_merge'] == 'left_only'].drop('_merge', axis=1) | |
# Forward fill ROI gaps for each agent | |
roi_filled_count = 0 | |
for agent_id in roi_filtered['agent_id'].unique(): | |
agent_mask = roi_filtered['agent_id'] == agent_id | |
agent_data = roi_filtered[agent_mask].sort_values('timestamp') | |
if len(agent_data) > 0 and 'roi' in roi_filtered.columns: | |
filled_roi = agent_data['roi'].ffill() | |
roi_filled_count += (filled_roi != agent_data['roi']).sum() | |
roi_filtered.loc[agent_mask, 'roi'] = filled_roi | |
logger.info(f"Removed {roi_removed_count} corresponding ROI values") | |
logger.info(f"Forward filled {roi_filled_count} ROI gaps") | |
else: | |
logger.info("No corresponding ROI values to remove") | |
# Log final statistics | |
logger.info(f"High APR filtering completed:") | |
logger.info(f" APR threshold: {max_apr_threshold}%") | |
logger.info(f" APR values removed: {high_apr_count}") | |
logger.info(f" Final APR data points: {len(apr_filtered)}") | |
if not roi_filtered.empty: | |
logger.info(f" Final ROI data points: {len(roi_filtered)}") | |
return apr_filtered, roi_filtered | |
def save_to_csv(self, df: pd.DataFrame, filename: str) -> Optional[str]: | |
"""Save DataFrame to CSV file.""" | |
if df.empty: | |
logger.error(f"No data to save to {filename}") | |
return None | |
try: | |
df.to_csv(filename, index=False) | |
logger.info(f"Data saved to {filename}") | |
return filename | |
except Exception as e: | |
logger.error(f"Error saving data to {filename}: {e}") | |
return None | |