Spaces:
Running
Running
""" | |
ROI chart implementations. | |
""" | |
import plotly.graph_objects as go | |
import pandas as pd | |
import logging | |
from datetime import datetime | |
from typing import Dict, Any, Optional, Tuple | |
from ..config.constants import DATE_RANGES, Y_AXIS_RANGES, FILE_PATHS | |
from .base_chart import BaseChart | |
logger = logging.getLogger(__name__) | |
class ROIChart(BaseChart): | |
"""Chart for ROI visualizations.""" | |
def create_chart(self, df: pd.DataFrame, **kwargs) -> go.Figure: | |
"""Create ROI time series chart.""" | |
if df.empty: | |
return self._create_empty_chart("No ROI data available") | |
# Filter for ROI data only | |
roi_data = df[df['metric_type'] == 'ROI'].copy() | |
if roi_data.empty: | |
return self._create_empty_chart("No ROI data available") | |
# ROI data is already in percentage format from corrected CSV | |
# No conversion needed | |
# Apply daily median aggregation to reduce outliers | |
roi_data = self.data_processor.aggregate_daily_medians(roi_data, ['roi']) | |
if roi_data.empty: | |
return self._create_empty_chart("No ROI data available after aggregation") | |
# Apply high APR filtering to ROI data (need to get APR data first to determine what to filter) | |
# Fetch APR data to determine which agent-timestamp combinations to filter | |
apr_df, _ = self.data_processor.fetch_apr_data_from_db() | |
if not apr_df.empty: | |
# Apply same processing to APR data to get the filtering reference | |
apr_processed = apr_df[apr_df['metric_type'] == 'APR'].copy() | |
apr_processed = self.data_processor.aggregate_daily_medians(apr_processed, ['apr', 'adjusted_apr']) | |
# Apply high APR filtering to both datasets | |
_, roi_data = self.data_processor.filter_high_apr_values(apr_processed, roi_data) | |
if roi_data.empty: | |
return self._create_empty_chart("No ROI data available after high APR filtering") | |
# Save processed ROI data for verification (after all processing steps) | |
processed_csv_path = self.data_processor.save_to_csv(roi_data, FILE_PATHS['roi_processed_csv']) | |
if processed_csv_path: | |
logger.info(f"Saved processed ROI data to {processed_csv_path} for verification") | |
logger.info(f"Processed ROI data contains {len(roi_data)} rows after agent exclusion, daily median aggregation, and high APR filtering") | |
# Filter outliers (disabled but keeping for compatibility) | |
roi_data = self._filter_outliers(roi_data, 'roi') | |
# Get time range | |
min_time = roi_data['timestamp'].min() | |
max_time = roi_data['timestamp'].max() | |
x_start_date = min_time # Use actual start date for ROI | |
# Calculate average runtime for title | |
fixed_start_date = DATE_RANGES['feb_start'] | |
agent_runtimes = {} | |
for agent_id in roi_data['agent_id'].unique(): | |
agent_data = roi_data[roi_data['agent_id'] == agent_id] | |
agent_name = agent_data['agent_name'].iloc[0] | |
last_report = agent_data['timestamp'].max() | |
runtime_days = (last_report - fixed_start_date).total_seconds() / (24 * 3600) | |
agent_runtimes[agent_id] = { | |
'agent_name': agent_name, | |
'last_report': last_report, | |
'runtime_days': runtime_days | |
} | |
avg_runtime = sum(data['runtime_days'] for data in agent_runtimes.values()) / len(agent_runtimes) if agent_runtimes else 0 | |
# Create figure | |
fig = self._create_base_figure() | |
# Add background shapes | |
y_range = Y_AXIS_RANGES['roi'] | |
self._add_background_shapes(fig, min_time, max_time, y_range['min'], y_range['max']) | |
self._add_zero_line(fig, min_time, max_time) | |
# Calculate moving averages | |
avg_roi_data = self._calculate_moving_average(roi_data, 'roi') | |
# Add individual agent data points | |
unique_agents = roi_data['agent_name'].unique() | |
color_map = self._get_color_map(unique_agents) | |
self._add_agent_data_points(fig, roi_data, 'roi', color_map) | |
# Add ROI moving average line | |
self._add_moving_average_line( | |
fig, avg_roi_data, 'roi', | |
'Average ROI (3d window)', | |
self.colors['roi'], | |
width=2 | |
) | |
# Update layout and axes | |
title = f"Modius Agents ROI (over avg. {avg_runtime:.1f} days runtime)" | |
self._update_layout( | |
fig, | |
title=title, | |
y_axis_title="ROI [%]", | |
y_range=[y_range['min'], y_range['max']] | |
) | |
# Use auto-range for both x-axis and y-axis to show all available data | |
self._update_axes( | |
fig, | |
x_range=None, # Let plotly auto-determine the best x-axis range | |
y_auto=True # Let plotly auto-determine the best y-axis range | |
) | |
# Save chart | |
self._save_chart( | |
fig, | |
FILE_PATHS['roi_graph_html'], | |
FILE_PATHS['roi_graph_png'] | |
) | |
return fig | |
def generate_roi_visualizations(data_processor=None) -> Tuple[go.Figure, Optional[str]]: | |
"""Generate ROI visualizations.""" | |
from ..data.data_processor import DataProcessor | |
if data_processor is None: | |
data_processor = DataProcessor() | |
# Fetch data | |
_, roi_df = data_processor.fetch_apr_data_from_db() | |
# Create chart | |
roi_chart = ROIChart(data_processor) | |
fig, csv_path = roi_chart.generate_visualization( | |
roi_df, | |
csv_filename=FILE_PATHS['roi_csv'] | |
) | |
return fig, csv_path | |