SB-PoC / demo.py
Chirapath's picture
First draft coding project
963ae98 verified
#!/usr/bin/env python3
"""
Unified AI Services - Interactive Demo
Demonstrates the capabilities of the unified system with real examples
"""
import asyncio
import httpx
import json
import time
import sys
from typing import Dict, Any, Optional
# Demo configuration
UNIFIED_URL = "http://localhost:8000"
TIMEOUT = 60
# Demo data
DEMO_TEXTS = {
"thai_crime": """
คดีอาญาที่สำคัญ: การฆาตกรรมที่กรุงเทพมหานคร
เมื่อวันที่ 15 ตุลาคม 2567 เวลา 14:30 น.
นายสมชาย ใจดี อายุ 45 ปี อาชีพนักธุรกิจ
ถูกพบเสียชีวิตที่คอนโดมิเนียม เดอะ ริเวอร์ ซิตี้ ชั้น 25
ผู้ต้องสงสัย: นางสาวมณี รักเงิน อายุ 32 ปี
เป็นเลขานุการของผู้เสียชีวิต
หลักฐาน: พบสารพิษในแก้วน้ำ
เงินจำนวน 500,000 บาท หายไปจากตู้เซฟ
กล้องวงจรปิดบันทึกเหตุการณ์ได้
ตำรวจสถานีทองหล่อทำการสืบสวน
พบว่าผู้ต้องสงสัยมีหนี้สินจำนวนมาก
""",
"english_business": """
Corporate Investigation Report - Tech Acquisition
On October 20, 2024, Microsoft Corporation announced the acquisition
of AI startup InnovateTech for $2.5 billion USD.
Key Personnel:
- CEO Sarah Johnson of InnovateTech
- VP Acquisitions David Chen at Microsoft
- Investment banker Lisa Rodriguez from Goldman Sachs
The deal includes:
- 150 AI researchers and engineers
- Proprietary machine learning algorithms
- Patents portfolio worth $800 million
- Office locations in San Francisco and Seattle
The acquisition strengthens Microsoft's position in the AI market
and provides access to advanced natural language processing technology.
""",
"mixed_content": """
International Business Partnership
บริษัท ไทยเทค จำกัด (ThaiTech Ltd.)
Partnership Agreement between:
- ThaiTech Limited (Thailand)
- Singapore AI Solutions Pte Ltd (Singapore)
- Tokyo Innovation Corp (Japan)
ข้อตกลงความร่วมมือ:
Investment: $10 million USD (approximately 350 million Thai Baht)
Duration: 5 years (2024-2029)
Focus: Artificial Intelligence and Machine Learning
Key Locations:
- Bangkok, Thailand (Head Office)
- สิงคโปร์ (Singapore Regional Office)
- Tokyo, Japan (R&D Center)
Expected Revenue: $50 million USD by 2027
"""
}
class UnifiedDemo:
"""Interactive demo for the unified AI services"""
def __init__(self):
self.session = None
self.demo_results = {}
async def __aenter__(self):
self.session = httpx.AsyncClient(timeout=TIMEOUT)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.aclose()
def print_header(self, title: str):
"""Print formatted header"""
print("\n" + "=" * 70)
print(f" {title}")
print("=" * 70)
def print_section(self, title: str):
"""Print section header"""
print(f"\n📋 {title}")
print("-" * 50)
async def check_system_health(self) -> bool:
"""Check if the unified system is healthy"""
try:
response = await self.session.get(f"{UNIFIED_URL}/health")
if response.status_code == 200:
data = response.json()
status = data.get("status")
services = data.get("services", [])
print(f"🏥 System Health: {status}")
for service in services:
health_icon = "✅" if service.get("health") else "❌"
print(f" {health_icon} {service.get('name', 'unknown')}: {service.get('status', 'unknown')}")
healthy_services = [s for s in services if s.get("health")]
if len(healthy_services) >= 3: # At least 3 services should be healthy
print("✅ System is ready for demo!")
return True
else:
print("❌ System is not ready. Please ensure all services are running.")
return False
else:
print(f"❌ Health check failed: HTTP {response.status_code}")
return False
except Exception as e:
print(f"❌ Cannot connect to unified system: {e}")
print("\n💡 Make sure the unified application is running:")
print(" python app.py")
return False
async def demo_unified_analysis(self, text: str, title: str) -> Optional[Dict[str, Any]]:
"""Demonstrate unified analysis capabilities"""
self.print_section(f"Unified Analysis: {title}")
try:
print(f"📝 Analyzing text ({len(text)} characters)...")
print(f" Text preview: {text[:100]}...")
request_data = {
"text": text,
"extract_relationships": True,
"include_embeddings": False,
"include_summary": True,
"generate_graph_files": True,
"export_formats": ["neo4j", "json"],
"enable_rag_indexing": True,
"rag_title": f"Demo: {title}",
"rag_keywords": ["demo", "analysis", "test"],
"rag_metadata": {"demo": True, "category": title.lower()}
}
start_time = time.time()
response = await self.session.post(f"{UNIFIED_URL}/analyze/unified", json=request_data)
processing_time = time.time() - start_time
if response.status_code == 200:
data = response.json()
if data.get("success"):
service_calls = data.get("service_calls", [])
ner_analysis = data.get("ner_analysis", {})
rag_document = data.get("rag_document", {})
print(f"✅ Analysis completed in {processing_time:.2f} seconds")
print(f"📞 Service calls: {', '.join(service_calls)}")
# NER Results
if ner_analysis:
entities = ner_analysis.get("entities", [])
relationships = ner_analysis.get("relationships", [])
language = ner_analysis.get("language", "unknown")
print(f"\n🏷️ NER Analysis Results:")
print(f" Language detected: {language}")
print(f" Entities found: {len(entities)}")
print(f" Relationships found: {len(relationships)}")
# Show top entities by type
entity_types = {}
for entity in entities:
entity_type = entity.get("label", "UNKNOWN")
if entity_type not in entity_types:
entity_types[entity_type] = []
entity_types[entity_type].append(entity.get("text", ""))
print(f"\n 📊 Entity breakdown:")
for entity_type, entity_list in sorted(entity_types.items()):
print(f" {entity_type}: {len(entity_list)} entities")
# Show a few examples
examples = entity_list[:3]
if examples:
print(f" Examples: {', '.join(examples)}")
# Show relationships
if relationships:
print(f"\n 🔗 Relationship examples:")
for rel in relationships[:3]:
source = rel.get("source_entity", "Unknown")
target = rel.get("target_entity", "Unknown")
rel_type = rel.get("relationship_type", "unknown")
confidence = rel.get("confidence", 0)
print(f" {source}{target} ({rel_type}, {confidence:.2f})")
else:
print(f" ⚠️ No relationships found")
# RAG Results
if rag_document:
print(f"\n💾 RAG Indexing Results:")
print(f" Document ID: {rag_document.get('document_id', 'N/A')}")
print(f" Total chunks: {rag_document.get('total_chunks', 0)}")
print(f" Status: Document indexed for search")
else:
print(f"\n⚠️ RAG indexing was not performed")
# Store results for later use
self.demo_results[title] = data
return data
else:
print(f"❌ Analysis failed: {data.get('error', 'Unknown error')}")
return None
else:
print(f"❌ Request failed: HTTP {response.status_code}")
print(f" Response: {response.text[:200]}")
return None
except Exception as e:
print(f"❌ Analysis error: {e}")
return None
async def demo_combined_search(self):
"""Demonstrate combined search capabilities"""
self.print_section("Combined Search with NER Enhancement")
search_queries = [
"murder investigation Thailand",
"Microsoft acquisition business",
"artificial intelligence partnership"
]
for query in search_queries:
try:
print(f"\n🔍 Searching for: '{query}'")
request_data = {
"query": query,
"limit": 3,
"similarity_threshold": 0.1,
"include_ner_analysis": True,
"ner_export_formats": ["json"]
}
start_time = time.time()
response = await self.session.post(f"{UNIFIED_URL}/search/combined", json=request_data)
search_time = time.time() - start_time
if response.status_code == 200:
data = response.json()
if data.get("success"):
search_results = data.get("search_results", {})
results = search_results.get("results", [])
ner_analyses = search_results.get("ner_analyses", [])
print(f" ✅ Search completed in {search_time:.2f} seconds")
print(f" 📊 Found {len(results)} results")
for i, result in enumerate(results):
chunk = result.get("chunk", {})
similarity = result.get("similarity_score", 0)
doc_info = result.get("document_info", {})
print(f"\n 📄 Result {i+1} (similarity: {similarity:.3f}):")
print(f" Title: {doc_info.get('title', 'Untitled')}")
print(f" Content: {chunk.get('content', '')[:100]}...")
if ner_analyses:
print(f"\n 🏷️ NER analysis performed on top {len(ner_analyses)} results")
for ner_data in ner_analyses:
ner_result = ner_data.get("ner_analysis", {})
if ner_result.get("success"):
entities = ner_result.get("entities", [])
relationships = ner_result.get("relationships", [])
print(f" Result {ner_data.get('result_index', 0)}: {len(entities)} entities, {len(relationships)} relationships")
else:
print(f" ❌ Search failed: {data.get('error', 'Unknown error')}")
else:
print(f" ❌ Search failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ Search error: {e}")
async def demo_service_proxies(self):
"""Demonstrate service proxy functionality"""
self.print_section("Service Proxy Demonstration")
# Test NER proxy
try:
print("🧪 Testing NER service proxy...")
test_data = {
"text": "Quick test: Apple Inc. CEO Tim Cook visited Tokyo, Japan.",
"extract_relationships": True,
"include_embeddings": False,
"generate_graph_files": False
}
response = await self.session.post(f"{UNIFIED_URL}/ner/analyze/text", json=test_data)
if response.status_code == 200:
result = response.json()
if result.get("success"):
entities = result.get("entities", [])
print(f" ✅ NER proxy working: found {len(entities)} entities")
else:
print(f" ❌ NER proxy failed: {result.get('error', 'Unknown error')}")
else:
print(f" ❌ NER proxy failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ NER proxy error: {e}")
# Test RAG proxy
try:
print("🧪 Testing RAG service proxy...")
response = await self.session.get(f"{UNIFIED_URL}/rag/documents?limit=3")
if response.status_code == 200:
result = response.json()
documents = result.get("documents", [])
print(f" ✅ RAG proxy working: found {len(documents)} documents")
else:
print(f" ❌ RAG proxy failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ RAG proxy error: {e}")
# Test OCR proxy
try:
print("🧪 Testing OCR service proxy...")
response = await self.session.get(f"{UNIFIED_URL}/ocr/health")
if response.status_code == 200:
print(f" ✅ OCR proxy working: health check passed")
else:
print(f" ❌ OCR proxy failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ OCR proxy error: {e}")
async def demo_service_discovery(self):
"""Demonstrate service discovery"""
self.print_section("Service Discovery")
try:
response = await self.session.get(f"{UNIFIED_URL}/services")
if response.status_code == 200:
data = response.json()
services = data.get("services", {})
unified = data.get("unified", {})
print(f"🔍 Service discovery successful:")
print(f" Unified endpoint: {unified.get('url', 'N/A')}")
for service_name, service_info in services.items():
endpoints = service_info.get("endpoints", [])
description = service_info.get("description", "No description")
url = service_info.get("url", "N/A")
print(f"\n 📡 {service_name.upper()} Service:")
print(f" URL: {url}")
print(f" Description: {description}")
print(f" Endpoints: {len(endpoints)} available")
# Show a few example endpoints
for endpoint in endpoints[:3]:
print(f" • {endpoint}")
if len(endpoints) > 3:
print(f" • ... and {len(endpoints) - 3} more")
else:
print(f"❌ Service discovery failed: HTTP {response.status_code}")
except Exception as e:
print(f"❌ Service discovery error: {e}")
def print_demo_summary(self):
"""Print summary of demo results"""
self.print_section("Demo Summary")
if not self.demo_results:
print("No analysis results to summarize.")
return
total_entities = 0
total_relationships = 0
languages_detected = set()
for title, data in self.demo_results.items():
ner_analysis = data.get("ner_analysis", {})
if ner_analysis:
entities = ner_analysis.get("entities", [])
relationships = ner_analysis.get("relationships", [])
language = ner_analysis.get("language", "unknown")
total_entities += len(entities)
total_relationships += len(relationships)
languages_detected.add(language)
print(f"📊 {title}:")
print(f" Language: {language}")
print(f" Entities: {len(entities)}")
print(f" Relationships: {len(relationships)}")
print(f"\n🎯 Overall Demo Statistics:")
print(f" Total analyses: {len(self.demo_results)}")
print(f" Total entities extracted: {total_entities}")
print(f" Total relationships found: {total_relationships}")
print(f" Languages detected: {', '.join(languages_detected)}")
print(f"\n✨ Capabilities Demonstrated:")
print(f" ✅ Multi-language NER analysis (Thai + English)")
print(f" ✅ Relationship extraction and mapping")
print(f" ✅ RAG document indexing")
print(f" ✅ Combined search with NER enhancement")
print(f" ✅ Service proxy functionality")
print(f" ✅ Unified workflow coordination")
print(f" ✅ Real-time processing and analysis")
async def run_interactive_demo(self):
"""Run the complete interactive demo"""
self.print_header("Unified AI Services - Interactive Demo")
print("This demo will showcase the capabilities of the unified AI system:")
print("• Multi-language NER analysis with relationship extraction")
print("• RAG document indexing and vector search")
print("• Combined workflows and service coordination")
print("• Service proxy functionality")
print("• Real-time health monitoring")
# Check system health
print("\n🔍 Checking system health...")
if not await self.check_system_health():
print("\n❌ Demo cannot proceed - system is not healthy")
return False
# Demo 1: Unified Analysis
self.print_header("Demo 1: Unified Analysis Capabilities")
for title, text in DEMO_TEXTS.items():
await self.demo_unified_analysis(text, title.replace("_", " ").title())
# Small delay between analyses
await asyncio.sleep(1)
# Demo 2: Combined Search
self.print_header("Demo 2: Combined Search with NER Enhancement")
await self.demo_combined_search()
# Demo 3: Service Proxies
self.print_header("Demo 3: Service Proxy Functionality")
await self.demo_service_proxies()
# Demo 4: Service Discovery
self.print_header("Demo 4: Service Discovery")
await self.demo_service_discovery()
# Summary
self.print_header("Demo Complete")
self.print_demo_summary()
print(f"\n🎉 Demo completed successfully!")
print(f"📖 For more information, visit: http://localhost:8000/docs")
return True
async def main():
"""Main demo function"""
print("🎬 Unified AI Services - Interactive Demo")
print("=" * 50)
if len(sys.argv) > 1:
unified_url = sys.argv[1]
global UNIFIED_URL
UNIFIED_URL = unified_url
print(f"🎯 Demo target: {UNIFIED_URL}")
print("\nMake sure the unified application is running:")
print(" python app.py")
# Wait for user confirmation
try:
input("\nPress Enter to start the demo (or Ctrl+C to cancel)...")
except KeyboardInterrupt:
print("\nDemo cancelled.")
return
async with UnifiedDemo() as demo:
success = await demo.run_interactive_demo()
if success:
print(f"\n🏆 Demo completed successfully!")
print(f"The unified AI services are working perfectly.")
else:
print(f"\n⚠️ Demo encountered some issues.")
print(f"Please check the system health and try again.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n🛑 Demo interrupted by user")
except Exception as e:
print(f"\n❌ Demo failed: {e}")
sys.exit(1)