#!/usr/bin/env python3 """ Unified AI Services - Interactive Demo Demonstrates the capabilities of the unified system with real examples """ import asyncio import httpx import json import time import sys from typing import Dict, Any, Optional # Demo configuration UNIFIED_URL = "http://localhost:8000" TIMEOUT = 60 # Demo data DEMO_TEXTS = { "thai_crime": """ คดีอาญาที่สำคัญ: การฆาตกรรมที่กรุงเทพมหานคร เมื่อวันที่ 15 ตุลาคม 2567 เวลา 14:30 น. นายสมชาย ใจดี อายุ 45 ปี อาชีพนักธุรกิจ ถูกพบเสียชีวิตที่คอนโดมิเนียม เดอะ ริเวอร์ ซิตี้ ชั้น 25 ผู้ต้องสงสัย: นางสาวมณี รักเงิน อายุ 32 ปี เป็นเลขานุการของผู้เสียชีวิต หลักฐาน: พบสารพิษในแก้วน้ำ เงินจำนวน 500,000 บาท หายไปจากตู้เซฟ กล้องวงจรปิดบันทึกเหตุการณ์ได้ ตำรวจสถานีทองหล่อทำการสืบสวน พบว่าผู้ต้องสงสัยมีหนี้สินจำนวนมาก """, "english_business": """ Corporate Investigation Report - Tech Acquisition On October 20, 2024, Microsoft Corporation announced the acquisition of AI startup InnovateTech for $2.5 billion USD. Key Personnel: - CEO Sarah Johnson of InnovateTech - VP Acquisitions David Chen at Microsoft - Investment banker Lisa Rodriguez from Goldman Sachs The deal includes: - 150 AI researchers and engineers - Proprietary machine learning algorithms - Patents portfolio worth $800 million - Office locations in San Francisco and Seattle The acquisition strengthens Microsoft's position in the AI market and provides access to advanced natural language processing technology. """, "mixed_content": """ International Business Partnership บริษัท ไทยเทค จำกัด (ThaiTech Ltd.) Partnership Agreement between: - ThaiTech Limited (Thailand) - Singapore AI Solutions Pte Ltd (Singapore) - Tokyo Innovation Corp (Japan) ข้อตกลงความร่วมมือ: Investment: $10 million USD (approximately 350 million Thai Baht) Duration: 5 years (2024-2029) Focus: Artificial Intelligence and Machine Learning Key Locations: - Bangkok, Thailand (Head Office) - สิงคโปร์ (Singapore Regional Office) - Tokyo, Japan (R&D Center) Expected Revenue: $50 million USD by 2027 """ } class UnifiedDemo: """Interactive demo for the unified AI services""" def __init__(self): self.session = None self.demo_results = {} async def __aenter__(self): self.session = httpx.AsyncClient(timeout=TIMEOUT) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.aclose() def print_header(self, title: str): """Print formatted header""" print("\n" + "=" * 70) print(f" {title}") print("=" * 70) def print_section(self, title: str): """Print section header""" print(f"\n📋 {title}") print("-" * 50) async def check_system_health(self) -> bool: """Check if the unified system is healthy""" try: response = await self.session.get(f"{UNIFIED_URL}/health") if response.status_code == 200: data = response.json() status = data.get("status") services = data.get("services", []) print(f"🏥 System Health: {status}") for service in services: health_icon = "✅" if service.get("health") else "❌" print(f" {health_icon} {service.get('name', 'unknown')}: {service.get('status', 'unknown')}") healthy_services = [s for s in services if s.get("health")] if len(healthy_services) >= 3: # At least 3 services should be healthy print("✅ System is ready for demo!") return True else: print("❌ System is not ready. Please ensure all services are running.") return False else: print(f"❌ Health check failed: HTTP {response.status_code}") return False except Exception as e: print(f"❌ Cannot connect to unified system: {e}") print("\n💡 Make sure the unified application is running:") print(" python app.py") return False async def demo_unified_analysis(self, text: str, title: str) -> Optional[Dict[str, Any]]: """Demonstrate unified analysis capabilities""" self.print_section(f"Unified Analysis: {title}") try: print(f"📝 Analyzing text ({len(text)} characters)...") print(f" Text preview: {text[:100]}...") request_data = { "text": text, "extract_relationships": True, "include_embeddings": False, "include_summary": True, "generate_graph_files": True, "export_formats": ["neo4j", "json"], "enable_rag_indexing": True, "rag_title": f"Demo: {title}", "rag_keywords": ["demo", "analysis", "test"], "rag_metadata": {"demo": True, "category": title.lower()} } start_time = time.time() response = await self.session.post(f"{UNIFIED_URL}/analyze/unified", json=request_data) processing_time = time.time() - start_time if response.status_code == 200: data = response.json() if data.get("success"): service_calls = data.get("service_calls", []) ner_analysis = data.get("ner_analysis", {}) rag_document = data.get("rag_document", {}) print(f"✅ Analysis completed in {processing_time:.2f} seconds") print(f"📞 Service calls: {', '.join(service_calls)}") # NER Results if ner_analysis: entities = ner_analysis.get("entities", []) relationships = ner_analysis.get("relationships", []) language = ner_analysis.get("language", "unknown") print(f"\n🏷️ NER Analysis Results:") print(f" Language detected: {language}") print(f" Entities found: {len(entities)}") print(f" Relationships found: {len(relationships)}") # Show top entities by type entity_types = {} for entity in entities: entity_type = entity.get("label", "UNKNOWN") if entity_type not in entity_types: entity_types[entity_type] = [] entity_types[entity_type].append(entity.get("text", "")) print(f"\n 📊 Entity breakdown:") for entity_type, entity_list in sorted(entity_types.items()): print(f" {entity_type}: {len(entity_list)} entities") # Show a few examples examples = entity_list[:3] if examples: print(f" Examples: {', '.join(examples)}") # Show relationships if relationships: print(f"\n 🔗 Relationship examples:") for rel in relationships[:3]: source = rel.get("source_entity", "Unknown") target = rel.get("target_entity", "Unknown") rel_type = rel.get("relationship_type", "unknown") confidence = rel.get("confidence", 0) print(f" {source} → {target} ({rel_type}, {confidence:.2f})") else: print(f" ⚠️ No relationships found") # RAG Results if rag_document: print(f"\n💾 RAG Indexing Results:") print(f" Document ID: {rag_document.get('document_id', 'N/A')}") print(f" Total chunks: {rag_document.get('total_chunks', 0)}") print(f" Status: Document indexed for search") else: print(f"\n⚠️ RAG indexing was not performed") # Store results for later use self.demo_results[title] = data return data else: print(f"❌ Analysis failed: {data.get('error', 'Unknown error')}") return None else: print(f"❌ Request failed: HTTP {response.status_code}") print(f" Response: {response.text[:200]}") return None except Exception as e: print(f"❌ Analysis error: {e}") return None async def demo_combined_search(self): """Demonstrate combined search capabilities""" self.print_section("Combined Search with NER Enhancement") search_queries = [ "murder investigation Thailand", "Microsoft acquisition business", "artificial intelligence partnership" ] for query in search_queries: try: print(f"\n🔍 Searching for: '{query}'") request_data = { "query": query, "limit": 3, "similarity_threshold": 0.1, "include_ner_analysis": True, "ner_export_formats": ["json"] } start_time = time.time() response = await self.session.post(f"{UNIFIED_URL}/search/combined", json=request_data) search_time = time.time() - start_time if response.status_code == 200: data = response.json() if data.get("success"): search_results = data.get("search_results", {}) results = search_results.get("results", []) ner_analyses = search_results.get("ner_analyses", []) print(f" ✅ Search completed in {search_time:.2f} seconds") print(f" 📊 Found {len(results)} results") for i, result in enumerate(results): chunk = result.get("chunk", {}) similarity = result.get("similarity_score", 0) doc_info = result.get("document_info", {}) print(f"\n 📄 Result {i+1} (similarity: {similarity:.3f}):") print(f" Title: {doc_info.get('title', 'Untitled')}") print(f" Content: {chunk.get('content', '')[:100]}...") if ner_analyses: print(f"\n 🏷️ NER analysis performed on top {len(ner_analyses)} results") for ner_data in ner_analyses: ner_result = ner_data.get("ner_analysis", {}) if ner_result.get("success"): entities = ner_result.get("entities", []) relationships = ner_result.get("relationships", []) print(f" Result {ner_data.get('result_index', 0)}: {len(entities)} entities, {len(relationships)} relationships") else: print(f" ❌ Search failed: {data.get('error', 'Unknown error')}") else: print(f" ❌ Search failed: HTTP {response.status_code}") except Exception as e: print(f" ❌ Search error: {e}") async def demo_service_proxies(self): """Demonstrate service proxy functionality""" self.print_section("Service Proxy Demonstration") # Test NER proxy try: print("🧪 Testing NER service proxy...") test_data = { "text": "Quick test: Apple Inc. CEO Tim Cook visited Tokyo, Japan.", "extract_relationships": True, "include_embeddings": False, "generate_graph_files": False } response = await self.session.post(f"{UNIFIED_URL}/ner/analyze/text", json=test_data) if response.status_code == 200: result = response.json() if result.get("success"): entities = result.get("entities", []) print(f" ✅ NER proxy working: found {len(entities)} entities") else: print(f" ❌ NER proxy failed: {result.get('error', 'Unknown error')}") else: print(f" ❌ NER proxy failed: HTTP {response.status_code}") except Exception as e: print(f" ❌ NER proxy error: {e}") # Test RAG proxy try: print("🧪 Testing RAG service proxy...") response = await self.session.get(f"{UNIFIED_URL}/rag/documents?limit=3") if response.status_code == 200: result = response.json() documents = result.get("documents", []) print(f" ✅ RAG proxy working: found {len(documents)} documents") else: print(f" ❌ RAG proxy failed: HTTP {response.status_code}") except Exception as e: print(f" ❌ RAG proxy error: {e}") # Test OCR proxy try: print("🧪 Testing OCR service proxy...") response = await self.session.get(f"{UNIFIED_URL}/ocr/health") if response.status_code == 200: print(f" ✅ OCR proxy working: health check passed") else: print(f" ❌ OCR proxy failed: HTTP {response.status_code}") except Exception as e: print(f" ❌ OCR proxy error: {e}") async def demo_service_discovery(self): """Demonstrate service discovery""" self.print_section("Service Discovery") try: response = await self.session.get(f"{UNIFIED_URL}/services") if response.status_code == 200: data = response.json() services = data.get("services", {}) unified = data.get("unified", {}) print(f"🔍 Service discovery successful:") print(f" Unified endpoint: {unified.get('url', 'N/A')}") for service_name, service_info in services.items(): endpoints = service_info.get("endpoints", []) description = service_info.get("description", "No description") url = service_info.get("url", "N/A") print(f"\n 📡 {service_name.upper()} Service:") print(f" URL: {url}") print(f" Description: {description}") print(f" Endpoints: {len(endpoints)} available") # Show a few example endpoints for endpoint in endpoints[:3]: print(f" • {endpoint}") if len(endpoints) > 3: print(f" • ... and {len(endpoints) - 3} more") else: print(f"❌ Service discovery failed: HTTP {response.status_code}") except Exception as e: print(f"❌ Service discovery error: {e}") def print_demo_summary(self): """Print summary of demo results""" self.print_section("Demo Summary") if not self.demo_results: print("No analysis results to summarize.") return total_entities = 0 total_relationships = 0 languages_detected = set() for title, data in self.demo_results.items(): ner_analysis = data.get("ner_analysis", {}) if ner_analysis: entities = ner_analysis.get("entities", []) relationships = ner_analysis.get("relationships", []) language = ner_analysis.get("language", "unknown") total_entities += len(entities) total_relationships += len(relationships) languages_detected.add(language) print(f"📊 {title}:") print(f" Language: {language}") print(f" Entities: {len(entities)}") print(f" Relationships: {len(relationships)}") print(f"\n🎯 Overall Demo Statistics:") print(f" Total analyses: {len(self.demo_results)}") print(f" Total entities extracted: {total_entities}") print(f" Total relationships found: {total_relationships}") print(f" Languages detected: {', '.join(languages_detected)}") print(f"\n✨ Capabilities Demonstrated:") print(f" ✅ Multi-language NER analysis (Thai + English)") print(f" ✅ Relationship extraction and mapping") print(f" ✅ RAG document indexing") print(f" ✅ Combined search with NER enhancement") print(f" ✅ Service proxy functionality") print(f" ✅ Unified workflow coordination") print(f" ✅ Real-time processing and analysis") async def run_interactive_demo(self): """Run the complete interactive demo""" self.print_header("Unified AI Services - Interactive Demo") print("This demo will showcase the capabilities of the unified AI system:") print("• Multi-language NER analysis with relationship extraction") print("• RAG document indexing and vector search") print("• Combined workflows and service coordination") print("• Service proxy functionality") print("• Real-time health monitoring") # Check system health print("\n🔍 Checking system health...") if not await self.check_system_health(): print("\n❌ Demo cannot proceed - system is not healthy") return False # Demo 1: Unified Analysis self.print_header("Demo 1: Unified Analysis Capabilities") for title, text in DEMO_TEXTS.items(): await self.demo_unified_analysis(text, title.replace("_", " ").title()) # Small delay between analyses await asyncio.sleep(1) # Demo 2: Combined Search self.print_header("Demo 2: Combined Search with NER Enhancement") await self.demo_combined_search() # Demo 3: Service Proxies self.print_header("Demo 3: Service Proxy Functionality") await self.demo_service_proxies() # Demo 4: Service Discovery self.print_header("Demo 4: Service Discovery") await self.demo_service_discovery() # Summary self.print_header("Demo Complete") self.print_demo_summary() print(f"\n🎉 Demo completed successfully!") print(f"📖 For more information, visit: http://localhost:8000/docs") return True async def main(): """Main demo function""" print("🎬 Unified AI Services - Interactive Demo") print("=" * 50) if len(sys.argv) > 1: unified_url = sys.argv[1] global UNIFIED_URL UNIFIED_URL = unified_url print(f"🎯 Demo target: {UNIFIED_URL}") print("\nMake sure the unified application is running:") print(" python app.py") # Wait for user confirmation try: input("\nPress Enter to start the demo (or Ctrl+C to cancel)...") except KeyboardInterrupt: print("\nDemo cancelled.") return async with UnifiedDemo() as demo: success = await demo.run_interactive_demo() if success: print(f"\n🏆 Demo completed successfully!") print(f"The unified AI services are working perfectly.") else: print(f"\n⚠️ Demo encountered some issues.") print(f"Please check the system health and try again.") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\n🛑 Demo interrupted by user") except Exception as e: print(f"\n❌ Demo failed: {e}") sys.exit(1)