| |
| """ |
| End-to-End Testing for FRED ML System |
| Tests the complete workflow: Streamlit → Lambda → S3 → Reports |
| """ |
|
|
| import pytest |
| import boto3 |
| import json |
| import time |
| import os |
| import sys |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| import requests |
| import subprocess |
| import tempfile |
| import shutil |
|
|
| |
| project_root = Path(__file__).parent.parent.parent |
| sys.path.append(str(project_root)) |
|
|
| |
|
|
| class TestFredMLEndToEnd: |
| """End-to-end test suite for FRED ML system""" |
| |
| @pytest.fixture(scope="class") |
| def aws_clients(self): |
| """Initialize AWS clients""" |
| return { |
| 's3': boto3.client('s3', region_name='us-west-2'), |
| 'lambda': boto3.client('lambda', region_name='us-west-2'), |
| 'ssm': boto3.client('ssm', region_name='us-west-2') |
| } |
| |
| @pytest.fixture(scope="class") |
| def test_config(self): |
| """Test configuration""" |
| return { |
| 's3_bucket': 'fredmlv1', |
| 'lambda_function': 'fred-ml-processor', |
| 'region': 'us-west-2', |
| 'test_indicators': ['GDP', 'UNRATE'], |
| 'test_start_date': '2024-01-01', |
| 'test_end_date': '2024-01-31' |
| } |
| |
| @pytest.fixture(scope="class") |
| def test_report_id(self): |
| """Generate unique test report ID""" |
| return f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| |
| def test_01_aws_credentials(self, aws_clients): |
| """Test AWS credentials and permissions""" |
| print("\n🔐 Testing AWS credentials...") |
| |
| |
| try: |
| response = aws_clients['s3'].list_objects_v2( |
| Bucket='fredmlv1', |
| MaxKeys=1 |
| ) |
| print("✅ S3 access verified") |
| except Exception as e: |
| pytest.fail(f"❌ S3 access failed: {e}") |
| |
| |
| try: |
| response = aws_clients['lambda'].list_functions(MaxItems=1) |
| print("✅ Lambda access verified") |
| except Exception as e: |
| pytest.fail(f"❌ Lambda access failed: {e}") |
| |
| |
| try: |
| response = aws_clients['ssm'].describe_parameters(MaxResults=1) |
| print("✅ SSM access verified") |
| except Exception as e: |
| pytest.fail(f"❌ SSM access failed: {e}") |
| |
| def test_02_s3_bucket_exists(self, aws_clients, test_config): |
| """Test S3 bucket exists and is accessible""" |
| print("\n📦 Testing S3 bucket...") |
| |
| try: |
| response = aws_clients['s3'].head_bucket(Bucket=test_config['s3_bucket']) |
| print(f"✅ S3 bucket '{test_config['s3_bucket']}' exists and is accessible") |
| except Exception as e: |
| pytest.fail(f"❌ S3 bucket access failed: {e}") |
| |
| def test_03_lambda_function_exists(self, aws_clients, test_config): |
| """Test Lambda function exists""" |
| print("\n⚡ Testing Lambda function...") |
| |
| try: |
| response = aws_clients['lambda'].get_function( |
| FunctionName=test_config['lambda_function'] |
| ) |
| print(f"✅ Lambda function '{test_config['lambda_function']}' exists") |
| print(f" Runtime: {response['Configuration']['Runtime']}") |
| print(f" Memory: {response['Configuration']['MemorySize']} MB") |
| print(f" Timeout: {response['Configuration']['Timeout']} seconds") |
| except Exception as e: |
| pytest.fail(f"❌ Lambda function not found: {e}") |
| |
| def test_04_fred_api_key_configured(self, aws_clients): |
| """Test FRED API key is configured in SSM""" |
| print("\n🔑 Testing FRED API key...") |
| |
| try: |
| response = aws_clients['ssm'].get_parameter( |
| Name='/fred-ml/api-key', |
| WithDecryption=True |
| ) |
| api_key = response['Parameter']['Value'] |
| |
| if api_key and api_key != 'your-fred-api-key-here': |
| print("✅ FRED API key is configured") |
| else: |
| pytest.fail("❌ FRED API key not properly configured") |
| except Exception as e: |
| pytest.fail(f"❌ FRED API key not found in SSM: {e}") |
| |
| def test_05_lambda_function_invocation(self, aws_clients, test_config, test_report_id): |
| """Test Lambda function invocation with test data""" |
| print("\n🚀 Testing Lambda function invocation...") |
| |
| |
| test_payload = { |
| 'indicators': test_config['test_indicators'], |
| 'start_date': test_config['test_start_date'], |
| 'end_date': test_config['test_end_date'], |
| 'options': { |
| 'visualizations': True, |
| 'correlation': True, |
| 'forecasting': False, |
| 'statistics': True |
| } |
| } |
| |
| try: |
| |
| response = aws_clients['lambda'].invoke( |
| FunctionName=test_config['lambda_function'], |
| InvocationType='RequestResponse', |
| Payload=json.dumps(test_payload) |
| ) |
| |
| |
| response_payload = json.loads(response['Payload'].read().decode('utf-8')) |
| |
| if response['StatusCode'] == 200 and response_payload.get('status') == 'success': |
| print("✅ Lambda function executed successfully") |
| print(f" Report ID: {response_payload.get('report_id')}") |
| print(f" Report Key: {response_payload.get('report_key')}") |
| return response_payload |
| else: |
| pytest.fail(f"❌ Lambda function failed: {response_payload}") |
| |
| except Exception as e: |
| pytest.fail(f"❌ Lambda invocation failed: {e}") |
| |
| def test_06_s3_report_storage(self, aws_clients, test_config, test_report_id): |
| """Test S3 report storage""" |
| print("\n📄 Testing S3 report storage...") |
| |
| try: |
| |
| response = aws_clients['s3'].list_objects_v2( |
| Bucket=test_config['s3_bucket'], |
| Prefix='reports/' |
| ) |
| |
| if 'Contents' in response: |
| print(f"✅ Found {len(response['Contents'])} report(s) in S3") |
| |
| |
| latest_report = max(response['Contents'], key=lambda x: x['LastModified']) |
| print(f" Latest report: {latest_report['Key']}") |
| print(f" Size: {latest_report['Size']} bytes") |
| print(f" Last modified: {latest_report['LastModified']}") |
| |
| |
| report_response = aws_clients['s3'].get_object( |
| Bucket=test_config['s3_bucket'], |
| Key=latest_report['Key'] |
| ) |
| |
| report_data = json.loads(report_response['Body'].read().decode('utf-8')) |
| |
| |
| required_fields = ['report_id', 'timestamp', 'indicators', 'statistics', 'data'] |
| for field in required_fields: |
| assert field in report_data, f"Missing required field: {field}" |
| |
| print("✅ Report structure is valid") |
| print(f" Indicators: {report_data['indicators']}") |
| print(f" Data points: {len(report_data['data'])}") |
| |
| return latest_report['Key'] |
| else: |
| pytest.fail("❌ No reports found in S3") |
| |
| except Exception as e: |
| pytest.fail(f"❌ S3 report verification failed: {e}") |
| |
| def test_07_s3_visualization_storage(self, aws_clients, test_config): |
| """Test S3 visualization storage""" |
| print("\n📊 Testing S3 visualization storage...") |
| |
| try: |
| |
| response = aws_clients['s3'].list_objects_v2( |
| Bucket=test_config['s3_bucket'], |
| Prefix='visualizations/' |
| ) |
| |
| if 'Contents' in response: |
| print(f"✅ Found {len(response['Contents'])} visualization(s) in S3") |
| |
| |
| visualization_types = ['time_series.png', 'correlation.png'] |
| for viz_type in visualization_types: |
| viz_objects = [obj for obj in response['Contents'] if viz_type in obj['Key']] |
| if viz_objects: |
| print(f" ✅ {viz_type}: {len(viz_objects)} file(s)") |
| else: |
| print(f" ⚠️ {viz_type}: No files found") |
| |
| return True |
| else: |
| print("⚠️ No visualizations found in S3 (this might be expected for test runs)") |
| return True |
| |
| except Exception as e: |
| pytest.fail(f"❌ S3 visualization verification failed: {e}") |
| |
| def test_08_streamlit_frontend_simulation(self, test_config): |
| """Simulate Streamlit frontend functionality""" |
| print("\n🎨 Testing Streamlit frontend simulation...") |
| |
| try: |
| |
| sys.path.append(str(project_root / 'frontend')) |
| |
| |
| from frontend.app import load_config |
| config = load_config() |
| |
| assert config['s3_bucket'] == test_config['s3_bucket'], "S3 bucket mismatch" |
| assert config['lambda_function'] == test_config['lambda_function'], "Lambda function mismatch" |
| |
| print("✅ Streamlit configuration is correct") |
| |
| |
| from frontend.app import init_aws_clients |
| s3_client, lambda_client = init_aws_clients() |
| |
| if s3_client and lambda_client: |
| print("✅ AWS clients initialized successfully") |
| else: |
| pytest.fail("❌ Failed to initialize AWS clients") |
| |
| return True |
| |
| except Exception as e: |
| pytest.fail(f"❌ Streamlit frontend simulation failed: {e}") |
| |
| def test_09_data_quality_verification(self, aws_clients, test_config): |
| """Verify data quality and completeness""" |
| print("\n🔍 Testing data quality...") |
| |
| try: |
| |
| response = aws_clients['s3'].list_objects_v2( |
| Bucket=test_config['s3_bucket'], |
| Prefix='reports/' |
| ) |
| |
| if 'Contents' in response: |
| latest_report = max(response['Contents'], key=lambda x: x['LastModified']) |
| |
| |
| report_response = aws_clients['s3'].get_object( |
| Bucket=test_config['s3_bucket'], |
| Key=latest_report['Key'] |
| ) |
| |
| report_data = json.loads(report_response['Body'].read().decode('utf-8')) |
| |
| |
| assert len(report_data['data']) > 0, "No data points found" |
| assert len(report_data['statistics']) > 0, "No statistics found" |
| |
| |
| for indicator in test_config['test_indicators']: |
| assert indicator in report_data['indicators'], f"Missing indicator: {indicator}" |
| |
| |
| assert report_data['start_date'] == test_config['test_start_date'], "Start date mismatch" |
| assert report_data['end_date'] == test_config['test_end_date'], "End date mismatch" |
| |
| print("✅ Data quality verification passed") |
| print(f" Data points: {len(report_data['data'])}") |
| print(f" Indicators: {report_data['indicators']}") |
| print(f" Date range: {report_data['start_date']} to {report_data['end_date']}") |
| |
| return True |
| else: |
| pytest.fail("❌ No reports found for data quality verification") |
| |
| except Exception as e: |
| pytest.fail(f"❌ Data quality verification failed: {e}") |
| |
| def test_10_performance_metrics(self, aws_clients, test_config): |
| """Test performance metrics""" |
| print("\n⚡ Testing performance metrics...") |
| |
| try: |
| |
| end_time = datetime.now() |
| start_time = end_time - timedelta(hours=1) |
| |
| cloudwatch = boto3.client('cloudwatch', region_name=test_config['region']) |
| |
| |
| response = cloudwatch.get_metric_statistics( |
| Namespace='AWS/Lambda', |
| MetricName='Invocations', |
| Dimensions=[{'Name': 'FunctionName', 'Value': test_config['lambda_function']}], |
| StartTime=start_time, |
| EndTime=end_time, |
| Period=300, |
| Statistics=['Sum'] |
| ) |
| |
| if response['Datapoints']: |
| invocations = sum(point['Sum'] for point in response['Datapoints']) |
| print(f"✅ Lambda invocations: {invocations}") |
| else: |
| print("⚠️ No Lambda invocation metrics found") |
| |
| |
| response = cloudwatch.get_metric_statistics( |
| Namespace='AWS/Lambda', |
| MetricName='Duration', |
| Dimensions=[{'Name': 'FunctionName', 'Value': test_config['lambda_function']}], |
| StartTime=start_time, |
| EndTime=end_time, |
| Period=300, |
| Statistics=['Average', 'Maximum'] |
| ) |
| |
| if response['Datapoints']: |
| avg_duration = sum(point['Average'] for point in response['Datapoints']) / len(response['Datapoints']) |
| max_duration = max(point['Maximum'] for point in response['Datapoints']) |
| print(f"✅ Average duration: {avg_duration:.2f}ms") |
| print(f"✅ Maximum duration: {max_duration:.2f}ms") |
| else: |
| print("⚠️ No Lambda duration metrics found") |
| |
| return True |
| |
| except Exception as e: |
| print(f"⚠️ Performance metrics test failed: {e}") |
| return True |
| |
| def test_11_error_handling(self, aws_clients, test_config): |
| """Test error handling scenarios""" |
| print("\n🚨 Testing error handling...") |
| |
| try: |
| |
| invalid_payload = { |
| 'indicators': ['INVALID_INDICATOR'], |
| 'start_date': '2024-01-01', |
| 'end_date': '2024-01-31', |
| 'options': { |
| 'visualizations': False, |
| 'correlation': False, |
| 'statistics': True |
| } |
| } |
| |
| response = aws_clients['lambda'].invoke( |
| FunctionName=test_config['lambda_function'], |
| InvocationType='RequestResponse', |
| Payload=json.dumps(invalid_payload) |
| ) |
| |
| response_payload = json.loads(response['Payload'].read().decode('utf-8')) |
| |
| |
| if response['StatusCode'] == 200: |
| print("✅ Error handling works correctly") |
| else: |
| print(f"⚠️ Unexpected response: {response_payload}") |
| |
| return True |
| |
| except Exception as e: |
| print(f"⚠️ Error handling test failed: {e}") |
| return True |
| |
| def test_12_cleanup_test_data(self, aws_clients, test_config, test_report_id): |
| """Clean up test data (optional)""" |
| print("\n🧹 Testing cleanup...") |
| |
| try: |
| |
| response = aws_clients['s3'].list_objects_v2( |
| Bucket=test_config['s3_bucket'], |
| Prefix=f'reports/{test_report_id}/' |
| ) |
| |
| if 'Contents' in response: |
| print(f"Found {len(response['Contents'])} test objects to clean up") |
| |
| |
| for obj in response['Contents']: |
| aws_clients['s3'].delete_object( |
| Bucket=test_config['s3_bucket'], |
| Key=obj['Key'] |
| ) |
| |
| print("✅ Test data cleaned up") |
| else: |
| print("✅ No test data to clean up") |
| |
| return True |
| |
| except Exception as e: |
| print(f"⚠️ Cleanup failed: {e}") |
| return True |
|
|
| def run_e2e_tests(): |
| """Run all end-to-end tests""" |
| print("🚀 Starting FRED ML End-to-End Tests") |
| print("=" * 50) |
| |
| |
| pytest.main([ |
| __file__, |
| '-v', |
| '--tb=short', |
| '--disable-warnings' |
| ]) |
|
|
| if __name__ == "__main__": |
| run_e2e_tests() |