Spaces:
Sleeping
Sleeping
| """ | |
| Tests for Models API Server | |
| Comprehensive test suite for FastAPI inference server. | |
| """ | |
| import pytest | |
| import asyncio | |
| from fastapi.testclient import TestClient | |
| import sys | |
| import os | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) | |
| from src.inference.api_server import app | |
| class TestAPIServer: | |
| """Test suite for Models API Server.""" | |
| def client(self): | |
| """Create test client.""" | |
| return TestClient(app) | |
| def sample_contracts(self): | |
| """Sample contract data for testing.""" | |
| return [ | |
| { | |
| "id": "CT001", | |
| "description": "Aquisição de computadores", | |
| "value": 50000.0, | |
| "supplier": "Tech Company A", | |
| "date": "2024-01-15", | |
| "organ": "Ministry of Education" | |
| }, | |
| { | |
| "id": "CT002", | |
| "description": "Aquisição de computadores", | |
| "value": 500000.0, | |
| "supplier": "Tech Company B", | |
| "date": "2024-01-20", | |
| "organ": "Ministry of Education" | |
| } | |
| ] | |
| def test_root_endpoint(self, client): | |
| """Test root endpoint returns API info.""" | |
| response = client.get("/") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["api"] == "Cidadão.AI Models" | |
| assert data["version"] == "1.0.0" | |
| assert data["status"] == "operational" | |
| assert "anomaly_detector" in data["models"] | |
| assert "endpoints" in data | |
| def test_health_check(self, client): | |
| """Test health check endpoint.""" | |
| response = client.get("/health") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["status"] == "healthy" | |
| assert data["models_loaded"] is True | |
| assert "anomaly_detector" in data["models"] | |
| def test_detect_anomalies_endpoint(self, client, sample_contracts): | |
| """Test anomaly detection endpoint.""" | |
| response = client.post( | |
| "/v1/detect-anomalies", | |
| json={ | |
| "contracts": sample_contracts, | |
| "threshold": 0.7 | |
| } | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "anomalies" in data | |
| assert "total_analyzed" in data | |
| assert "anomalies_found" in data | |
| assert "confidence_score" in data | |
| assert "model_version" in data | |
| assert data["total_analyzed"] == len(sample_contracts) | |
| assert isinstance(data["anomalies"], list) | |
| assert 0 <= data["confidence_score"] <= 1 | |
| def test_analyze_patterns_endpoint(self, client): | |
| """Test pattern analysis endpoint.""" | |
| response = client.post( | |
| "/v1/analyze-patterns", | |
| json={ | |
| "data": { | |
| "contracts": [{"value": 100000}, {"value": 200000}], | |
| "period": "2024-Q1" | |
| }, | |
| "analysis_type": "temporal" | |
| } | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "patterns" in data | |
| assert "pattern_count" in data | |
| assert "confidence" in data | |
| assert "insights" in data | |
| assert isinstance(data["patterns"], list) | |
| assert data["pattern_count"] >= 0 | |
| assert 0 <= data["confidence"] <= 1 | |
| assert isinstance(data["insights"], list) | |
| def test_analyze_spectral_endpoint(self, client): | |
| """Test spectral analysis endpoint.""" | |
| response = client.post( | |
| "/v1/analyze-spectral", | |
| json={ | |
| "time_series": [100, 200, 150, 300, 250, 400, 350], | |
| "sampling_rate": 1.0 | |
| } | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "frequencies" in data | |
| assert "amplitudes" in data | |
| assert "dominant_frequency" in data | |
| assert "periodic_patterns" in data | |
| assert isinstance(data["frequencies"], list) | |
| assert isinstance(data["amplitudes"], list) | |
| assert isinstance(data["dominant_frequency"], float) | |
| assert isinstance(data["periodic_patterns"], list) | |
| def test_metrics_endpoint(self, client): | |
| """Test Prometheus metrics endpoint.""" | |
| # Make some requests first | |
| client.get("/") | |
| client.get("/health") | |
| response = client.get("/metrics") | |
| assert response.status_code == 200 | |
| metrics = response.text | |
| assert "cidadao_models_requests_total" in metrics | |
| assert "cidadao_models_request_duration_seconds" in metrics | |
| assert "cidadao_models_anomalies_total" in metrics | |
| def test_invalid_endpoint(self, client): | |
| """Test invalid endpoint returns 404.""" | |
| response = client.get("/invalid/endpoint") | |
| assert response.status_code == 404 | |
| def test_empty_contracts_handling(self, client): | |
| """Test handling of empty contracts list.""" | |
| response = client.post( | |
| "/v1/detect-anomalies", | |
| json={ | |
| "contracts": [], | |
| "threshold": 0.7 | |
| } | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["total_analyzed"] == 0 | |
| assert data["anomalies_found"] == 0 | |
| assert data["anomalies"] == [] | |
| def test_invalid_request_format(self, client): | |
| """Test handling of invalid request format.""" | |
| # Missing required field | |
| response = client.post( | |
| "/v1/detect-anomalies", | |
| json={"threshold": 0.7} # Missing contracts | |
| ) | |
| assert response.status_code == 422 # Validation error | |
| # Invalid data type | |
| response = client.post( | |
| "/v1/detect-anomalies", | |
| json={ | |
| "contracts": "not_a_list", | |
| "threshold": 0.7 | |
| } | |
| ) | |
| assert response.status_code == 422 | |
| def test_cors_headers(self, client): | |
| """Test CORS headers are properly set.""" | |
| response = client.options("/") | |
| assert "access-control-allow-origin" in response.headers | |
| assert response.headers["access-control-allow-origin"] == "*" | |
| def test_performance_requirements(self, client, num_contracts, max_time): | |
| """Test API performance with different loads.""" | |
| import time | |
| # Generate test data | |
| contracts = [ | |
| { | |
| "id": f"CT{i:06d}", | |
| "description": f"Contract {i}", | |
| "value": 50000.0 + (i * 100), | |
| "supplier": f"Company {i % 20}", | |
| "date": f"2024-01-{(i % 28) + 1:02d}", | |
| "organ": f"Ministry {i % 5}" | |
| } | |
| for i in range(num_contracts) | |
| ] | |
| start_time = time.time() | |
| response = client.post( | |
| "/v1/detect-anomalies", | |
| json={ | |
| "contracts": contracts, | |
| "threshold": 0.7 | |
| } | |
| ) | |
| elapsed_time = time.time() - start_time | |
| assert response.status_code == 200 | |
| assert elapsed_time < max_time | |
| class TestAPIServerIntegration: | |
| """Integration tests for API Server.""" | |
| def client(self): | |
| """Create test client.""" | |
| return TestClient(app) | |
| def test_full_workflow(self, client): | |
| """Test complete workflow from detection to analysis.""" | |
| # Step 1: Detect anomalies | |
| contracts = [ | |
| { | |
| "id": f"CT{i:03d}", | |
| "description": f"Contract {i}", | |
| "value": 100000.0 if i != 5 else 1000000.0, # One anomaly | |
| "supplier": f"Company {i % 3}", | |
| "date": f"2024-01-{i+1:02d}", | |
| "organ": "Ministry A" | |
| } | |
| for i in range(10) | |
| ] | |
| anomaly_response = client.post( | |
| "/v1/detect-anomalies", | |
| json={"contracts": contracts} | |
| ) | |
| assert anomaly_response.status_code == 200 | |
| anomaly_data = anomaly_response.json() | |
| assert anomaly_data["anomalies_found"] > 0 | |
| # Step 2: Analyze patterns in the data | |
| pattern_response = client.post( | |
| "/v1/analyze-patterns", | |
| json={ | |
| "data": { | |
| "anomaly_results": anomaly_data, | |
| "contracts": contracts | |
| }, | |
| "analysis_type": "temporal" | |
| } | |
| ) | |
| assert pattern_response.status_code == 200 | |
| pattern_data = pattern_response.json() | |
| assert len(pattern_data["patterns"]) > 0 | |
| # Step 3: Perform spectral analysis on values | |
| values = [c["value"] for c in contracts] | |
| spectral_response = client.post( | |
| "/v1/analyze-spectral", | |
| json={ | |
| "time_series": values, | |
| "sampling_rate": 1.0 | |
| } | |
| ) | |
| assert spectral_response.status_code == 200 | |
| spectral_data = spectral_response.json() | |
| assert spectral_data["dominant_frequency"] >= 0 | |
| def test_concurrent_requests(self, client): | |
| """Test handling of concurrent requests.""" | |
| import concurrent.futures | |
| def make_request(request_id): | |
| return client.post( | |
| "/v1/detect-anomalies", | |
| json={ | |
| "contracts": [ | |
| { | |
| "id": f"REQ{request_id}-CT001", | |
| "description": "Test contract", | |
| "value": 100000.0, | |
| "supplier": "Test Company", | |
| "date": "2024-01-01", | |
| "organ": "Test Ministry" | |
| } | |
| ] | |
| } | |
| ) | |
| # Make 10 concurrent requests | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
| futures = [ | |
| executor.submit(make_request, i) | |
| for i in range(10) | |
| ] | |
| results = [ | |
| future.result() | |
| for future in concurrent.futures.as_completed(futures) | |
| ] | |
| # All should succeed | |
| assert all(r.status_code == 200 for r in results) | |
| assert len(results) == 10 |