Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Union | |
| from pathlib import Path | |
| from ..config import config | |
| def format_number_to_2_decimals(value: Union[int, float, str, None]) -> Union[str, None]: | |
| """ | |
| Format numeric values to 2 decimal places. | |
| Args: | |
| value: The value to format | |
| Returns: | |
| Formatted string with 2 decimals, or original value if not numeric | |
| """ | |
| if value is None: | |
| return None | |
| # Keep string values like 'N/A' unchanged | |
| if isinstance(value, str): | |
| return value | |
| # Format numeric values to 2 decimal places | |
| if isinstance(value, (int, float)): | |
| return f"{value:.2f}" | |
| return value | |
| def save_workflow_to_csv(workflow_result: Dict[str, Any], date: str) -> bool: | |
| """ | |
| Save workflow results to a single CSV file with each row representing a different date. | |
| Args: | |
| workflow_result: Complete workflow result containing all analysis data | |
| date: Date string for the analysis | |
| Returns: | |
| bool: True if saved successfully, False otherwise | |
| """ | |
| try: | |
| if not workflow_result.get('success', False): | |
| print(f"No data to save to CSV.") | |
| return False | |
| # Extract results from workflow | |
| results = workflow_result.get('results', {}) | |
| symbols = workflow_result.get('symbols', []) | |
| if not symbols: | |
| print(f"No symbols found in workflow result.") | |
| return False | |
| symbol = symbols[0] # We process one symbol at a time | |
| # Get portfolio manager results - Access through symbol key | |
| portfolio_results = results.get('portfolio_manager', {}) | |
| if not portfolio_results or symbol not in portfolio_results: | |
| print(f"No portfolio manager results found for {symbol}.") | |
| return False | |
| symbol_portfolio_result = portfolio_results[symbol] | |
| # Check if portfolio analysis was successful | |
| if not symbol_portfolio_result.get('success', False): | |
| print(f"Portfolio analysis failed for {symbol}: {symbol_portfolio_result.get('error', 'Unknown error')}") | |
| return False | |
| # Get data collection results for current price | |
| data_results = results.get('data_collection', {}) | |
| market_data = data_results.get('market_data', {}) if data_results else {} | |
| current_price = market_data.get('current_price', 'N/A') | |
| # Get technical analysis results | |
| tech_results = results.get('technical_analysis', {}) | |
| tech_indicators = {} | |
| if tech_results and tech_results.get('success'): | |
| indicators_data = tech_results.get('indicators', {}) | |
| tech_indicators = indicators_data.get('technical_indicators', {}) | |
| # Helper function to get last value from indicator lists | |
| def get_last_value(data): | |
| if isinstance(data, list) and data: | |
| return data[-1] | |
| elif isinstance(data, (int, float)): | |
| return data | |
| return None | |
| # Extract individual technical indicators | |
| sma = get_last_value(tech_indicators.get('SMA', [])) | |
| rsi = get_last_value(tech_indicators.get('RSI', [])) | |
| adx = get_last_value(tech_indicators.get('ADX', [])) | |
| cci = get_last_value(tech_indicators.get('CCI', [])) | |
| # MACD components | |
| macd_data = tech_indicators.get('MACD', {}) | |
| macd_line = get_last_value(macd_data.get('macd', [])) if isinstance(macd_data, dict) else None | |
| macd_signal = get_last_value(macd_data.get('signal', [])) if isinstance(macd_data, dict) else None | |
| macd_histogram = get_last_value(macd_data.get('histogram', [])) if isinstance(macd_data, dict) else None | |
| # Bollinger Bands components | |
| bb_data = tech_indicators.get('BBANDS', {}) | |
| bb_upper = get_last_value(bb_data.get('upper', [])) if isinstance(bb_data, dict) else None | |
| bb_middle = get_last_value(bb_data.get('middle', [])) if isinstance(bb_data, dict) else None | |
| bb_lower = get_last_value(bb_data.get('lower', [])) if isinstance(bb_data, dict) else None | |
| # Get news intelligence results for NLP features | |
| news_results = results.get('news_intelligence', {}) | |
| nlp_features = {} | |
| if news_results and news_results.get('success'): | |
| nlp_features = news_results.get('nlp_features', {}) | |
| # Create comprehensive CSV data with all indicators and NLP features | |
| # Apply 2-decimal formatting ONLY to technical/financial fields | |
| csv_data = { | |
| # Basic data | |
| 'date': date, | |
| 'symbol': symbol, | |
| 'close': format_number_to_2_decimals(current_price), | |
| 'trading_signal': symbol_portfolio_result.get('trading_signal', 'N/A'), | |
| 'confidence_level': symbol_portfolio_result.get('confidence_level', 'N/A'), # Keep original precision | |
| 'position_size': symbol_portfolio_result.get('position_size', 'N/A'), # Keep original precision | |
| # Technical indicators - format to 2 decimals | |
| 'sma': format_number_to_2_decimals(sma), | |
| 'rsi': format_number_to_2_decimals(rsi), | |
| 'adx': format_number_to_2_decimals(adx), | |
| 'cci': format_number_to_2_decimals(cci), | |
| # MACD components - format to 2 decimals | |
| 'macd_line': format_number_to_2_decimals(macd_line), | |
| 'macd_signal': format_number_to_2_decimals(macd_signal), | |
| 'macd_histogram': format_number_to_2_decimals(macd_histogram), | |
| # Bollinger Bands components - format to 2 decimals | |
| 'bb_upper': format_number_to_2_decimals(bb_upper), | |
| 'bb_middle': format_number_to_2_decimals(bb_middle), | |
| 'bb_lower': format_number_to_2_decimals(bb_lower), | |
| # NLP features from news intelligence - keep original precision | |
| 'news_relevance': nlp_features.get('news_relevance'), | |
| 'sentiment': nlp_features.get('sentiment'), | |
| 'price_impact_potential': nlp_features.get('price_impact_potential'), | |
| 'trend_direction': nlp_features.get('trend_direction'), | |
| 'earnings_impact': nlp_features.get('earnings_impact'), | |
| 'investor_confidence': nlp_features.get('investor_confidence'), | |
| 'risk_profile_change': nlp_features.get('risk_profile_change') | |
| } | |
| # Create CSV file path - SINGLE FILE for all dates | |
| csv_path = config.csv_output_path | |
| os.makedirs(csv_path, exist_ok=True) | |
| csv_file = os.path.join(csv_path, 'daily_analysis.csv') | |
| # Check if file exists and load existing data | |
| file_exists = os.path.exists(csv_file) | |
| if file_exists: | |
| # Load existing data and append new row | |
| try: | |
| existing_df = pd.read_csv(csv_file) | |
| new_df = pd.DataFrame([csv_data]) | |
| combined_df = pd.concat([existing_df, new_df], ignore_index=True) | |
| # Remove duplicate entries for same date/symbol, keeping the latest | |
| combined_df = combined_df.drop_duplicates(subset=['date', 'symbol'], keep='last') | |
| # Sort by date descending (newest first) | |
| combined_df = combined_df.sort_values(by='date', ascending=False) | |
| except pd.errors.EmptyDataError: | |
| # File exists but is empty | |
| combined_df = pd.DataFrame([csv_data]) | |
| else: | |
| # Create new file | |
| combined_df = pd.DataFrame([csv_data]) | |
| # Save to CSV | |
| combined_df.to_csv(csv_file, index=False) | |
| print(f"CSV saved successfully: {csv_file}") | |
| print(f"Total rows in CSV: {len(combined_df)}") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving CSV: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def save_workflow_to_symbol_csv(workflow_result: Dict[str, Any], date: str, data_dir: str = "./output/csv") -> bool: | |
| """ | |
| Save workflow results to a per-symbol CSV under data_dir with filename | |
| pattern: daily_analysis_{SYMBOL}.csv. | |
| The file will include at least: date, symbol, close, trading_signal, | |
| confidence_level, position_size, and selected indicators and NLP features. | |
| Args: | |
| workflow_result: Workflow result dict | |
| date: Analysis date (YYYY-MM-DD) | |
| data_dir: Target directory for symbol CSV files (default ./data) | |
| Returns: | |
| bool: True if saved successfully | |
| """ | |
| try: | |
| if not workflow_result.get('success', False): | |
| return False | |
| results = workflow_result.get('results', {}) | |
| symbols = workflow_result.get('symbols', []) | |
| if not symbols: | |
| return False | |
| symbol = symbols[0].upper() | |
| portfolio_results = results.get('portfolio_manager', {}) | |
| if not portfolio_results or symbol not in portfolio_results: | |
| return False | |
| symbol_portfolio_result = portfolio_results[symbol] | |
| if not symbol_portfolio_result.get('success', False): | |
| return False | |
| data_results = results.get('data_collection', {}) | |
| market_data = data_results.get('market_data', {}) if data_results else {} | |
| current_price = market_data.get('current_price', 'N/A') | |
| tech_results = results.get('technical_analysis', {}) | |
| tech_indicators = {} | |
| if tech_results and tech_results.get('success'): | |
| indicators_data = tech_results.get('indicators', {}) | |
| tech_indicators = indicators_data.get('technical_indicators', {}) | |
| def get_last_value(data): | |
| if isinstance(data, list) and data: | |
| return data[-1] | |
| elif isinstance(data, (int, float)): | |
| return data | |
| return None | |
| sma = get_last_value(tech_indicators.get('SMA', [])) | |
| rsi = get_last_value(tech_indicators.get('RSI', [])) | |
| adx = get_last_value(tech_indicators.get('ADX', [])) | |
| cci = get_last_value(tech_indicators.get('CCI', [])) | |
| macd_data = tech_indicators.get('MACD', {}) | |
| macd_line = get_last_value(macd_data.get('macd', [])) if isinstance(macd_data, dict) else None | |
| macd_signal = get_last_value(macd_data.get('signal', [])) if isinstance(macd_data, dict) else None | |
| macd_histogram = get_last_value(macd_data.get('histogram', [])) if isinstance(macd_data, dict) else None | |
| bb_data = tech_indicators.get('BBANDS', {}) | |
| bb_upper = get_last_value(bb_data.get('upper', [])) if isinstance(bb_data, dict) else None | |
| bb_middle = get_last_value(bb_data.get('middle', [])) if isinstance(bb_data, dict) else None | |
| bb_lower = get_last_value(bb_data.get('lower', [])) if isinstance(bb_data, dict) else None | |
| news_results = results.get('news_intelligence', {}) | |
| nlp_features = news_results.get('nlp_features', {}) if news_results and news_results.get('success') else {} | |
| csv_row = { | |
| 'date': date, | |
| 'symbol': symbol, | |
| 'close': format_number_to_2_decimals(current_price), | |
| 'trading_signal': symbol_portfolio_result.get('trading_signal', 'N/A'), | |
| 'confidence_level': symbol_portfolio_result.get('confidence_level', 'N/A'), | |
| 'position_size': symbol_portfolio_result.get('position_size', 'N/A'), | |
| 'sma': format_number_to_2_decimals(sma), | |
| 'rsi': format_number_to_2_decimals(rsi), | |
| 'adx': format_number_to_2_decimals(adx), | |
| 'cci': format_number_to_2_decimals(cci), | |
| 'macd_line': format_number_to_2_decimals(macd_line), | |
| 'macd_signal': format_number_to_2_decimals(macd_signal), | |
| 'macd_histogram': format_number_to_2_decimals(macd_histogram), | |
| 'bb_upper': format_number_to_2_decimals(bb_upper), | |
| 'bb_middle': format_number_to_2_decimals(bb_middle), | |
| 'bb_lower': format_number_to_2_decimals(bb_lower), | |
| 'news_relevance': nlp_features.get('news_relevance'), | |
| 'sentiment': nlp_features.get('sentiment'), | |
| 'price_impact_potential': nlp_features.get('price_impact_potential'), | |
| 'trend_direction': nlp_features.get('trend_direction'), | |
| 'earnings_impact': nlp_features.get('earnings_impact'), | |
| 'investor_confidence': nlp_features.get('investor_confidence'), | |
| 'risk_profile_change': nlp_features.get('risk_profile_change'), | |
| } | |
| # Ensure directory and file path | |
| Path(data_dir).mkdir(parents=True, exist_ok=True) | |
| symbol_csv = Path(data_dir) / f"daily_analysis_{symbol}.csv" | |
| # Append/update row | |
| if symbol_csv.exists(): | |
| try: | |
| existing_df = pd.read_csv(symbol_csv) | |
| new_df = pd.DataFrame([csv_row]) | |
| combined = pd.concat([existing_df, new_df], ignore_index=True) | |
| combined = combined.drop_duplicates(subset=['date'], keep='last') | |
| combined = combined.sort_values(by='date', ascending=True) | |
| except pd.errors.EmptyDataError: | |
| combined = pd.DataFrame([csv_row]) | |
| else: | |
| combined = pd.DataFrame([csv_row]) | |
| combined.to_csv(symbol_csv, index=False) | |
| print(f"Symbol CSV saved: {symbol_csv}") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving symbol CSV: {e}") | |
| return False | |