#!/usr/bin/env python3 """ Comprehensive Test Suite for Unified AI Services Tests the unified application and all integrated services (NER, OCR, RAG) Combines functionality from test_rag.py and test_ner.py with new unified tests """ import asyncio import httpx import json import io import sys import time import tempfile import os from pathlib import Path from typing import Dict, List, Any, Optional, Tuple import uuid as python_uuid # Import configuration try: from configs import get_config, validate_environment config = get_config() except ImportError: print("⚠️ Could not import configs. Using default values.") config = None # Test configuration UNIFIED_URL = "http://localhost:8000" # Main unified app NER_URL = "http://localhost:8500" # Direct NER service OCR_URL = "http://localhost:8400" # Direct OCR service RAG_URL = "http://localhost:8401" # Direct RAG service TEST_TIMEOUT = 300 # Test data (from original test files) THAI_CYANIDE_MURDER_CASE = """ เหตุฆาตกรรมด้วยไซยาไนด์ พ.ศ. 2566 คดีฆาตกรรมต่อเนื่องที่สั่นสะเทือนสังคมไทย เกิดขึ้นระหว่างเดือนเมษายน-ตุลาคม พ.ศ. 2566 โดยมีนางสาวสาริณี ชัยวัฒน์ หรือ "แอม ไซยาไนด์" อายุ 36 ปี เป็นผู้ต้องหา รายละเอียดคดี: ผู้ต้องหาได้ทำการวางยาพิษไซยาไนด์ (Potassium Cyanide) ในอาหารและเครื่องดื่มของเหยื่อหลายราย เหยื่อรายแรกคือ นางสิริพร บุญลาภวนิช อายุ 32 ปี เสียชีวิตเมื่อวันที่ 14 เมษายน 2566 ที่จังหวัดกาญจนบุรี เหยื่อรายที่สอง นายสุรชัย อยู่คงคลัง อายุ 45 ปี เสียชีวิตเมื่อวันที่ 2 พฤษภาคม 2566 ที่จังหวัดราชบุรี การสืบสวน: ตำรวจภูธรภาค 7 ร่วมกับ สำนักงานตำรวจแห่งชาติ ทำการสืบสวน พบหลักฐานจากกล้องวงจรปิด (CCTV) ในหลายพื้นที่ ตรวจพบสารไซยาไนด์ในร่างกายเหยื่อทุกราย การจับกุม: วันที่ 3 ตุลาคม 2566 ตำรวจจับกุมตัวผู้ต้องหาได้ที่โรงแรมเดอะ บายแซด ตั้งอยู่ที่ ถนนรามคำแหง กรุงเทพมหานคร พบเอกสารปลอม บัตรประชาชนปลอม และวัตถุพยานสำคัญอื่นๆ ยึดทรัพย์สินที่ได้จากการกระทำผิด มูลค่ารวมกว่า 2 ล้านบาท """ ENGLISH_CYBERSECURITY_CASE = """ Major Cybersecurity Incident Report - Operation Digital Shield Incident Overview: On October 15, 2024, CyberDefense Corp, a leading cybersecurity firm headquartered in Austin, Texas, detected a sophisticated Advanced Persistent Threat (APT) targeting critical infrastructure across Southeast Asia. Key Personnel: - Dr. Sarah Chen, Chief Security Officer at CyberDefense Corp - Agent Michael Rodriguez, FBI Cyber Division - Captain Lisa Thompson, US Cyber Command Technical Details: The attackers used a custom malware strain called "DeepStrike" developed by the Shadow Dragon group Primary attack vector: spear-phishing emails containing weaponized PDF documents Estimated financial damage: $50 million USD across affected organizations """ TEST_URLS = [ "https://httpbin.org/html", "https://httpbin.org/json" ] class TestResult: """Class to track test results""" def __init__(self): self.total_tests = 0 self.passed_tests = 0 self.failed_tests = 0 self.test_results = [] self.warnings = [] def add_result(self, test_name: str, passed: bool, message: str = "", details: Dict = None): """Add a test result""" self.total_tests += 1 if passed: self.passed_tests += 1 print(f"✅ {test_name}") if message: print(f" {message}") else: self.failed_tests += 1 print(f"❌ {test_name}: {message}") self.test_results.append({ 'test_name': test_name, 'passed': passed, 'message': message, 'details': details or {} }) def add_warning(self, test_name: str, message: str): """Add a warning (doesn't count as pass/fail)""" print(f"⚠️ {test_name}: {message}") self.warnings.append({ 'test_name': test_name, 'message': message }) def print_summary(self): """Print test summary""" print("\n" + "="*60) print("UNIFIED SYSTEM TEST SUMMARY") print("="*60) print(f"Total Tests: {self.total_tests}") print(f"Passed: {self.passed_tests}") print(f"Failed: {self.failed_tests}") print(f"Warnings: {len(self.warnings)}") print(f"Success Rate: {(self.passed_tests/self.total_tests*100):.1f}%" if self.total_tests > 0 else "0%") if self.failed_tests > 0: print(f"\n❌ FAILED TESTS:") for result in self.test_results: if not result['passed']: print(f" - {result['test_name']}: {result['message']}") if self.warnings: print(f"\n⚠️ WARNINGS:") for warning in self.warnings: print(f" - {warning['test_name']}: {warning['message']}") class UnifiedSystemTester: """Main test class for unified system""" def __init__(self): self.result = TestResult() self.session = None self.created_documents = [] # Track for cleanup self.created_analyses = [] # Track for cleanup async def __aenter__(self): self.session = httpx.AsyncClient(timeout=TEST_TIMEOUT) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.aclose() async def make_request(self, method: str, url: str, **kwargs) -> httpx.Response: """Make HTTP request with error handling""" try: response = await self.session.request(method, url, **kwargs) return response except httpx.RequestError as e: raise Exception(f"Request failed: {e}") async def test_unified_app_health(self): """Test 1: Unified Application Health Check""" print("🔍 Test 1: Unified Application Health Check") try: response = await self.make_request('GET', f"{UNIFIED_URL}/health") if response.status_code == 200: data = response.json() status = data.get("status") services = data.get("services", []) healthy_services = [s for s in services if s.get("health")] total_services = len(services) if status in ["healthy", "degraded"] and healthy_services: message = f"Status: {status}, Services: {len(healthy_services)}/{total_services} healthy" for service in services: service_status = "✅" if service.get("health") else "❌" message += f"\n {service_status} {service.get('name')}: {service.get('status')} ({service.get('response_time', 0):.3f}s)" self.result.add_result( "Unified App Health Check", True, message, data ) return True else: self.result.add_result( "Unified App Health Check", False, f"System unhealthy: {data}" ) return False else: self.result.add_result( "Unified App Health Check", False, f"HTTP {response.status_code}: {response.text}" ) return False except Exception as e: # Provide detailed diagnostics for connection failures if "connection" in str(e).lower(): print(f"\n🔍 Connection Diagnostics:") print(f" Unified App URL: {UNIFIED_URL}") print(f" Error: {e}") print(f"\n💡 Possible Issues:") print(f" 1. Unified app is not running") print(f" 2. Wrong host/port in configuration") print(f" 3. Services failed to start") print(f"\n🚀 To Start Unified App:") print(f" python app.py") self.result.add_result( "Unified App Health Check", False, str(e) ) return False async def test_individual_service_health(self): """Test 2: Individual Service Health Checks""" print("🔍 Test 2: Individual Service Health Checks") services = [ ("NER", NER_URL), ("OCR", OCR_URL), ("RAG", RAG_URL) ] all_healthy = True service_statuses = {} for service_name, service_url in services: try: response = await self.make_request('GET', f"{service_url}/health") if response.status_code == 200: data = response.json() status = data.get("status", "unknown") service_statuses[service_name] = { "healthy": True, "status": status, "details": data } print(f" ✅ {service_name}: {status}") else: service_statuses[service_name] = { "healthy": False, "status": f"HTTP {response.status_code}", "details": None } print(f" ❌ {service_name}: HTTP {response.status_code}") all_healthy = False except Exception as e: service_statuses[service_name] = { "healthy": False, "status": f"Error: {e}", "details": None } print(f" ❌ {service_name}: {e}") all_healthy = False self.result.add_result( "Individual Service Health", all_healthy, f"Services healthy: {sum(1 for s in service_statuses.values() if s['healthy'])}/{len(services)}", service_statuses ) return all_healthy async def test_unified_analysis_text(self): """Test 3: Unified Analysis with Text""" print("🔍 Test 3: Unified Analysis with Text") try: request_data = { "text": THAI_CYANIDE_MURDER_CASE, "extract_relationships": True, "include_embeddings": False, "include_summary": True, "generate_graph_files": True, "export_formats": ["neo4j", "json"], "enable_rag_indexing": True, "rag_title": "Cyanide Murder Case Analysis", "rag_keywords": ["cyanide", "murder", "investigation", "thai"], "rag_metadata": {"test": True, "case_type": "criminal"} } response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data) if response.status_code == 200: data = response.json() if data.get("success"): service_calls = data.get("service_calls", []) ner_analysis = data.get("ner_analysis", {}) rag_document = data.get("rag_document", {}) processing_time = data.get("processing_time", 0) # Validate NER analysis entities = ner_analysis.get("entities", []) relationships = ner_analysis.get("relationships", []) # Track analysis for cleanup if ner_analysis.get("analysis_id"): self.created_analyses.append(ner_analysis["analysis_id"]) if rag_document and rag_document.get("document_id"): self.created_documents.append(rag_document["document_id"]) message = f"Service calls: {', '.join(service_calls)}" message += f"\n Processing time: {processing_time:.2f}s" message += f"\n NER entities: {len(entities)}" message += f"\n NER relationships: {len(relationships)}" if rag_document: message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}" message += f"\n RAG chunks: {rag_document.get('total_chunks', 0)}" # Check if we got expected service calls expected_calls = ["ner_text"] if "enable_rag_indexing" in request_data and request_data["enable_rag_indexing"]: expected_calls.append("rag_upload") all_expected_calls = all(call in service_calls for call in expected_calls) self.result.add_result( "Unified Analysis (Text)", all_expected_calls and entities and len(service_calls) > 0, message, data ) return data else: self.result.add_result( "Unified Analysis (Text)", False, data.get("error", "Analysis failed") ) return None else: self.result.add_result( "Unified Analysis (Text)", False, f"HTTP {response.status_code}: {response.text[:200]}" ) return None except Exception as e: self.result.add_result( "Unified Analysis (Text)", False, str(e) ) return None async def test_unified_analysis_url(self): """Test 4: Unified Analysis with URL""" print("🔍 Test 4: Unified Analysis with URL") try: request_data = { "url": "https://httpbin.org/html", "extract_relationships": True, "include_embeddings": False, "include_summary": True, "generate_graph_files": False, "export_formats": ["json"], "enable_rag_indexing": True, "rag_title": "Test URL Document", "rag_keywords": ["test", "url", "httpbin"], "rag_metadata": {"test": True, "source": "httpbin"} } response = await self.make_request('POST', f"{UNIFIED_URL}/analyze/unified", json=request_data) if response.status_code == 200: data = response.json() if data.get("success"): service_calls = data.get("service_calls", []) ner_analysis = data.get("ner_analysis", {}) rag_document = data.get("rag_document", {}) # Track for cleanup if ner_analysis.get("analysis_id"): self.created_analyses.append(ner_analysis["analysis_id"]) if rag_document and rag_document.get("document_id"): self.created_documents.append(rag_document["document_id"]) message = f"Service calls: {', '.join(service_calls)}" message += f"\n NER analysis ID: {ner_analysis.get('analysis_id', 'N/A')}" if rag_document: message += f"\n RAG document ID: {rag_document.get('document_id', 'N/A')}" # Check for expected service calls has_ner_url = "ner_url" in service_calls has_rag_url = "rag_url" in service_calls self.result.add_result( "Unified Analysis (URL)", has_ner_url and len(service_calls) > 0, message, data ) return data else: self.result.add_result( "Unified Analysis (URL)", False, data.get("error", "URL analysis failed") ) return None else: self.result.add_result( "Unified Analysis (URL)", False, f"HTTP {response.status_code}: {response.text[:200]}" ) return None except Exception as e: self.result.add_result( "Unified Analysis (URL)", False, str(e) ) return None async def test_combined_search(self): """Test 5: Combined Search with NER Analysis""" print("🔍 Test 5: Combined Search with NER Analysis") # Wait a moment for indexing to complete await asyncio.sleep(2) try: request_data = { "query": "investigation murder case", "limit": 5, "similarity_threshold": 0.1, # Lower threshold for better results "include_ner_analysis": True, "ner_export_formats": ["json"] } response = await self.make_request('POST', f"{UNIFIED_URL}/search/combined", json=request_data) if response.status_code == 200: data = response.json() if data.get("success"): service_calls = data.get("service_calls", []) search_results = data.get("search_results", {}) results = search_results.get("results", []) ner_analyses = search_results.get("ner_analyses", []) message = f"Service calls: {', '.join(service_calls)}" message += f"\n Search results: {len(results)}" message += f"\n NER analyses: {len(ner_analyses)}" message += f"\n Processing time: {data.get('processing_time', 0):.2f}s" # Check for expected service calls has_rag_search = "rag_search" in service_calls has_ner_analysis = any("ner_text_" in call for call in service_calls) success = has_rag_search and len(service_calls) > 0 if len(results) == 0: self.result.add_warning( "Combined Search", "No search results found - may need more indexed content" ) self.result.add_result( "Combined Search", success, message, data ) return data else: self.result.add_result( "Combined Search", False, data.get("error", "Search failed") ) return None else: self.result.add_result( "Combined Search", False, f"HTTP {response.status_code}: {response.text[:200]}" ) return None except Exception as e: self.result.add_result( "Combined Search", False, str(e) ) return None async def test_service_proxies(self): """Test 6: Service Proxy Endpoints""" print("🔍 Test 6: Service Proxy Endpoints") proxy_tests = [] # Test NER proxy try: ner_data = { "text": "Test entity recognition with John Smith working at Microsoft in Seattle.", "extract_relationships": True, "include_embeddings": False, "generate_graph_files": False } response = await self.make_request('POST', f"{UNIFIED_URL}/ner/analyze/text", json=ner_data) if response.status_code == 200: result = response.json() if result.get("success"): entities = result.get("entities", []) proxy_tests.append(("NER Proxy", True, f"Found {len(entities)} entities")) # Track for cleanup if result.get("analysis_id"): self.created_analyses.append(result["analysis_id"]) else: proxy_tests.append(("NER Proxy", False, "Analysis failed")) else: proxy_tests.append(("NER Proxy", False, f"HTTP {response.status_code}")) except Exception as e: proxy_tests.append(("NER Proxy", False, str(e))) # Test OCR proxy try: response = await self.make_request('GET', f"{UNIFIED_URL}/ocr/health") if response.status_code == 200: proxy_tests.append(("OCR Proxy", True, "Health check passed")) else: proxy_tests.append(("OCR Proxy", False, f"HTTP {response.status_code}")) except Exception as e: proxy_tests.append(("OCR Proxy", False, str(e))) # Test RAG proxy try: response = await self.make_request('GET', f"{UNIFIED_URL}/rag/documents?limit=5") if response.status_code == 200: result = response.json() documents = result.get("documents", []) proxy_tests.append(("RAG Proxy", True, f"Found {len(documents)} documents")) else: proxy_tests.append(("RAG Proxy", False, f"HTTP {response.status_code}")) except Exception as e: proxy_tests.append(("RAG Proxy", False, str(e))) # Evaluate proxy tests passed_proxies = sum(1 for _, passed, _ in proxy_tests if passed) total_proxies = len(proxy_tests) for test_name, passed, message in proxy_tests: print(f" {'✅' if passed else '❌'} {test_name}: {message}") self.result.add_result( "Service Proxies", passed_proxies == total_proxies, f"Proxies working: {passed_proxies}/{total_proxies}", {"proxy_results": proxy_tests} ) return passed_proxies > 0 async def test_file_upload_unified(self): """Test 7: File Upload through Unified Interface""" print("🔍 Test 7: File Upload through Unified Interface") try: # Create test document test_content = """ Technical Report: Advanced AI Systems This report examines the integration of Named Entity Recognition (NER), Optical Character Recognition (OCR), and Retrieval-Augmented Generation (RAG) systems in a unified architecture. Key Personnel: - Dr. Alice Johnson, Lead AI Researcher at TechCorp - Prof. Bob Smith, University of Technology - Sarah Wilson, Data Scientist Technical Components: - Azure OpenAI for embeddings and language processing - PostgreSQL with vector extensions for data storage - FastAPI for microservice architecture The system processes documents through multiple stages: 1. OCR extraction for scanned documents 2. NER analysis for entity and relationship extraction 3. RAG indexing for searchable knowledge base Testing conducted on October 15, 2024 showed 95% accuracy. Total budget: $250,000 for the complete implementation. """ # Test through NER proxy (file upload) file_content = test_content.encode('utf-8') files = {"file": ("test_report.txt", io.BytesIO(file_content), "text/plain")} data = { "extract_relationships": "true", "include_embeddings": "false", "include_summary": "true", "generate_graph_files": "true", "export_formats": "neo4j,json" } response = await self.make_request( 'POST', f"{UNIFIED_URL}/ner/analyze/file", files=files, data=data ) if response.status_code == 200: result = response.json() if result.get("success"): entities = result.get("entities", []) relationships = result.get("relationships", []) # Track for cleanup if result.get("analysis_id"): self.created_analyses.append(result["analysis_id"]) message = f"File processed successfully" message += f"\n Entities: {len(entities)}" message += f"\n Relationships: {len(relationships)}" message += f"\n Language: {result.get('language', 'unknown')}" # Look for expected entities person_entities = [e for e in entities if e.get('label') == 'PERSON'] org_entities = [e for e in entities if e.get('label') == 'ORGANIZATION'] money_entities = [e for e in entities if e.get('label') == 'MONEY'] message += f"\n People found: {len(person_entities)}" message += f"\n Organizations found: {len(org_entities)}" message += f"\n Money amounts found: {len(money_entities)}" success = len(entities) > 0 and result.get("analysis_id") self.result.add_result( "File Upload (Unified)", success, message, result ) return result else: self.result.add_result( "File Upload (Unified)", False, result.get("error", "File analysis failed") ) return None else: self.result.add_result( "File Upload (Unified)", False, f"HTTP {response.status_code}: {response.text[:200]}" ) return None except Exception as e: self.result.add_result( "File Upload (Unified)", False, str(e) ) return None async def test_service_discovery(self): """Test 8: Service Discovery and Listing""" print("🔍 Test 8: Service Discovery and Listing") try: response = await self.make_request('GET', f"{UNIFIED_URL}/services") if response.status_code == 200: data = response.json() services = data.get("services", {}) unified = data.get("unified", {}) expected_services = ["ner", "ocr", "rag"] found_services = list(services.keys()) message = f"Services discovered: {', '.join(found_services)}" message += f"\n Unified endpoint: {unified.get('url', 'N/A')}" for service_name, service_info in services.items(): endpoints = service_info.get("endpoints", []) message += f"\n {service_name}: {len(endpoints)} endpoints" all_expected_found = all(service in found_services for service in expected_services) self.result.add_result( "Service Discovery", all_expected_found, message, data ) return data else: self.result.add_result( "Service Discovery", False, f"HTTP {response.status_code}" ) return None except Exception as e: self.result.add_result( "Service Discovery", False, str(e) ) return None async def test_system_performance(self): """Test 9: System Performance and Reliability""" print("🔍 Test 9: System Performance and Reliability") try: # Test multiple concurrent requests tasks = [] test_texts = [ "Performance test with Apple Inc and CEO Tim Cook in California.", "Reliability testing of Microsoft Azure services in Seattle.", "Load testing with Google Cloud Platform and AI systems." ] start_time = time.time() for i, text in enumerate(test_texts): task = self.make_request( 'POST', f"{UNIFIED_URL}/ner/analyze/text", json={ "text": text, "extract_relationships": True, "include_embeddings": False, "generate_graph_files": False } ) tasks.append(task) # Execute concurrent requests responses = await asyncio.gather(*tasks, return_exceptions=True) total_time = time.time() - start_time # Analyze results successful_requests = 0 total_entities = 0 for i, response in enumerate(responses): if isinstance(response, Exception): continue if response.status_code == 200: result = response.json() if result.get("success"): successful_requests += 1 entities = result.get("entities", []) total_entities += len(entities) # Track for cleanup if result.get("analysis_id"): self.created_analyses.append(result["analysis_id"]) avg_time_per_request = total_time / len(test_texts) message = f"Concurrent requests: {successful_requests}/{len(test_texts)} successful" message += f"\n Total time: {total_time:.2f}s" message += f"\n Avg time per request: {avg_time_per_request:.2f}s" message += f"\n Total entities found: {total_entities}" # Performance criteria performance_ok = ( successful_requests >= len(test_texts) * 0.8 and # 80% success rate avg_time_per_request < 10.0 # Under 10 seconds per request ) self.result.add_result( "System Performance", performance_ok, message, { "successful_requests": successful_requests, "total_requests": len(test_texts), "total_time": total_time, "avg_time_per_request": avg_time_per_request, "total_entities": total_entities } ) return performance_ok except Exception as e: self.result.add_result( "System Performance", False, str(e) ) return False async def test_error_handling(self): """Test 10: Error Handling and Resilience""" print("🔍 Test 10: Error Handling and Resilience") error_tests = [] # Test 1: Invalid unified analysis request try: response = await self.make_request( 'POST', f"{UNIFIED_URL}/analyze/unified", json={"invalid": "data"} ) if response.status_code in [400, 422]: # Expected validation error error_tests.append(("Invalid Request Handling", True, "Properly rejected invalid data")) else: error_tests.append(("Invalid Request Handling", False, f"Unexpected status: {response.status_code}")) except Exception as e: error_tests.append(("Invalid Request Handling", False, str(e))) # Test 2: Empty text analysis try: response = await self.make_request( 'POST', f"{UNIFIED_URL}/ner/analyze/text", json={"text": "", "extract_relationships": True} ) if response.status_code in [400, 422]: # Expected validation error error_tests.append(("Empty Text Handling", True, "Properly rejected empty text")) else: result = response.json() if not result.get("success"): error_tests.append(("Empty Text Handling", True, "Failed gracefully")) else: error_tests.append(("Empty Text Handling", False, "Should have failed")) except Exception as e: error_tests.append(("Empty Text Handling", False, str(e))) # Test 3: Invalid URL try: response = await self.make_request( 'POST', f"{UNIFIED_URL}/analyze/unified", json={ "url": "https://invalid-url-that-does-not-exist-12345.com", "extract_relationships": True } ) if response.status_code == 200: result = response.json() if not result.get("success"): error_tests.append(("Invalid URL Handling", True, "Failed gracefully with invalid URL")) else: error_tests.append(("Invalid URL Handling", False, "Should have failed")) else: error_tests.append(("Invalid URL Handling", True, f"Rejected invalid URL (HTTP {response.status_code})")) except Exception as e: error_tests.append(("Invalid URL Handling", False, str(e))) # Evaluate error handling tests passed_error_tests = sum(1 for _, passed, _ in error_tests if passed) total_error_tests = len(error_tests) for test_name, passed, message in error_tests: print(f" {'✅' if passed else '❌'} {test_name}: {message}") self.result.add_result( "Error Handling", passed_error_tests >= total_error_tests * 0.8, # 80% success rate f"Error tests passed: {passed_error_tests}/{total_error_tests}", {"error_test_results": error_tests} ) return passed_error_tests > 0 async def cleanup_test_data(self): """Clean up test data""" print("\n🧹 Cleaning up test data...") cleanup_count = 0 cleanup_errors = 0 # Clean up NER analyses for analysis_id in self.created_analyses: try: # Try direct service first response = await self.make_request('DELETE', f"{NER_URL}/analysis/{analysis_id}") if response.status_code in [200, 404]: # 404 is OK (already deleted) cleanup_count += 1 else: cleanup_errors += 1 except Exception as e: cleanup_errors += 1 print(f" ⚠️ Failed to cleanup analysis {analysis_id[:8]}...: {e}") # Clean up RAG documents for document_id in self.created_documents: try: # Try through unified proxy response = await self.make_request('DELETE', f"{UNIFIED_URL}/rag/documents/{document_id}") if response.status_code in [200, 404]: # 404 is OK (already deleted) cleanup_count += 1 else: cleanup_errors += 1 except Exception as e: cleanup_errors += 1 print(f" ⚠️ Failed to cleanup document {document_id[:8]}...: {e}") if cleanup_count > 0: print(f" ✅ Cleaned up {cleanup_count} test items") if cleanup_errors > 0: print(f" ⚠️ Failed to cleanup {cleanup_errors} items") async def run_comprehensive_tests(self): """Run all comprehensive unified system tests""" print("🚀 Unified AI Services - Comprehensive Test Suite") print("Testing: NER + OCR + RAG Integration with Unified Workflows") print("=" * 80) start_time = time.time() # Test sequence tests = [ ("Unified App Health", self.test_unified_app_health), ("Individual Service Health", self.test_individual_service_health), ("Unified Analysis (Text)", self.test_unified_analysis_text), ("Unified Analysis (URL)", self.test_unified_analysis_url), ("Combined Search", self.test_combined_search), ("Service Proxies", self.test_service_proxies), ("File Upload (Unified)", self.test_file_upload_unified), ("Service Discovery", self.test_service_discovery), ("System Performance", self.test_system_performance), ("Error Handling", self.test_error_handling) ] for test_name, test_func in tests: print(f"\n" + "=" * 80) try: await test_func() except Exception as e: print(f"❌ {test_name} failed with exception: {e}") self.result.add_result(test_name, False, f"Exception: {e}") # Cleanup print(f"\n" + "=" * 80) await self.cleanup_test_data() # Final summary total_time = time.time() - start_time print(f"\n" + "=" * 80) print("📊 UNIFIED SYSTEM COMPREHENSIVE TEST RESULTS") print("=" * 80) self.result.print_summary() print(f"\nTEST EXECUTION:") print(f"Total Time: {total_time:.2f} seconds") print(f"Tests Created: NER analyses: {len(self.created_analyses)}, RAG documents: {len(self.created_documents)}") passed = self.result.passed_tests total = self.result.total_tests if passed == total: print(f"\n🎉 ALL UNIFIED SYSTEM TESTS PASSED!") print(f"✅ Unified application is fully operational") print(f"✅ All services are integrated and working") print(f"✅ Combined workflows are functional") print(f"✅ Service proxies are working") print(f"✅ Error handling is robust") print(f"\n🎯 UNIFIED SYSTEM CAPABILITIES VERIFIED:") print(f" • NER + OCR + RAG service integration") print(f" • Unified analysis workflows") print(f" • Combined search with NER enhancement") print(f" • Service proxy functionality") print(f" • Multi-language support") print(f" • Concurrent request handling") print(f" • Comprehensive error handling") print(f" • Real-time service health monitoring") else: print(f"\n⚠️ SOME UNIFIED SYSTEM TESTS FAILED") print(f"❌ {self.result.failed_tests} out of {total} tests failed") print(f"\n🔧 TROUBLESHOOTING STEPS:") print(f"1. Check that all services are running:") print(f" • NER Service: {NER_URL}/health") print(f" • OCR Service: {OCR_URL}/health") print(f" • RAG Service: {RAG_URL}/health") print(f" • Unified App: {UNIFIED_URL}/health") print(f"2. Verify configuration in .env file") print(f"3. Check service logs for errors") print(f"4. Ensure all dependencies are installed") print(f"5. Verify database connectivity") return passed == total async def main(): """Main test runner""" if len(sys.argv) > 1: unified_url = sys.argv[1] else: unified_url = UNIFIED_URL # Update global URL global UNIFIED_URL UNIFIED_URL = unified_url print(f"🧪 Unified AI Services - Comprehensive Test Suite") print(f"📡 Testing unified system at: {UNIFIED_URL}") print(f"🔗 Expected services:") print(f" • NER Service: {NER_URL}") print(f" • OCR Service: {OCR_URL}") print(f" • RAG Service: {RAG_URL}") print(f" • Unified App: {UNIFIED_URL}") print(f"\nMake sure the unified application is running before starting tests.") print(f"Start command: python app.py") # Wait for user confirmation input(f"\nPress Enter to start unified system tests...") async with UnifiedSystemTester() as tester: success = await tester.run_comprehensive_tests() if success: print(f"\n🏆 UNIFIED SYSTEM VERIFICATION COMPLETE!") print(f"✅ All services are integrated and operational") print(f"✅ Combined workflows are working perfectly") print(f"✅ Ready for production deployment") sys.exit(0) else: print(f"\n🔧 UNIFIED SYSTEM NEEDS ATTENTION") print(f"❌ Some functionality is not working correctly") print(f"📋 Review the test results above for specific issues") sys.exit(1) if __name__ == "__main__": asyncio.run(main())