#!/usr/bin/env python3 """ Service Management Tool for Unified AI Services Helps start, stop, monitor, and troubleshoot individual services """ import os import sys import time import signal import subprocess import asyncio import json from pathlib import Path from typing import Dict, List, Optional, Tuple import argparse import httpx import psutil # Import configuration if available try: from configs import get_config, validate_environment config = get_config() except ImportError: print("โš ๏ธ Could not import configs. Using default values.") config = None class ServiceManager: """Manages individual services for development and troubleshooting""" def __init__(self): self.processes: Dict[str, subprocess.Popen] = {} self.service_configs = { "ner": { "script": "services/ner_service.py", "port": 8500, "description": "Named Entity Recognition with relationship extraction" }, "ocr": { "script": "services/ocr_service.py", "port": 8400, "description": "Optical Character Recognition with document processing" }, "rag": { "script": "services/rag_service.py", "port": 8401, "description": "Retrieval-Augmented Generation with vector search" }, "unified": { "script": "app.py", "port": 8000, "description": "Unified application coordinating all services" } } # Update ports from config if available if config: self.service_configs["ner"]["port"] = config.ner.PORT self.service_configs["ocr"]["port"] = config.ocr.PORT self.service_configs["rag"]["port"] = config.rag.PORT self.service_configs["unified"]["port"] = config.MAIN_PORT def print_header(self, title: str): """Print formatted header""" print("\n" + "=" * 60) print(f" {title}") print("=" * 60) def print_service_info(self, service_name: str): """Print service information""" if service_name not in self.service_configs: return service = self.service_configs[service_name] print(f"๐Ÿ“ {service_name.upper()} Service") print(f" Description: {service['description']}") print(f" Script: {service['script']}") print(f" Port: {service['port']}") print(f" URL: http://localhost:{service['port']}") def is_port_in_use(self, port: int) -> bool: """Check if port is in use""" try: for conn in psutil.net_connections(): if conn.laddr.port == port: return True return False except: return False async def check_service_health(self, service_name: str) -> Tuple[bool, Optional[Dict]]: """Check service health""" if service_name not in self.service_configs: return False, None port = self.service_configs[service_name]["port"] try: async with httpx.AsyncClient() as client: response = await client.get( f"http://localhost:{port}/health", timeout=5.0 ) if response.status_code == 200: return True, response.json() else: return False, {"error": f"HTTP {response.status_code}"} except Exception as e: return False, {"error": str(e)} def start_service(self, service_name: str) -> bool: """Start a specific service""" if service_name not in self.service_configs: print(f"โŒ Unknown service: {service_name}") return False service = self.service_configs[service_name] script_path = service["script"] port = service["port"] # Check if script exists if not Path(script_path).exists(): print(f"โŒ Service script not found: {script_path}") return False # Check if port is already in use if self.is_port_in_use(port): print(f"โš ๏ธ Port {port} is already in use. Service may already be running.") return False # Check if service is already running in our process list if service_name in self.processes: process = self.processes[service_name] if process.poll() is None: # Process is still running print(f"โš ๏ธ {service_name} service is already running (PID: {process.pid})") return False try: print(f"๐Ÿš€ Starting {service_name} service...") print(f" Script: {script_path}") print(f" Port: {port}") # Start the service if sys.platform == "win32": process = subprocess.Popen([ sys.executable, script_path ], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: process = subprocess.Popen([ sys.executable, script_path ], preexec_fn=os.setsid) self.processes[service_name] = process # Wait a moment for startup time.sleep(2) # Check if process is still running if process.poll() is None: print(f"โœ… {service_name} service started successfully (PID: {process.pid})") return True else: print(f"โŒ {service_name} service failed to start") return False except Exception as e: print(f"โŒ Failed to start {service_name} service: {e}") return False def stop_service(self, service_name: str) -> bool: """Stop a specific service""" if service_name not in self.service_configs: print(f"โŒ Unknown service: {service_name}") return False port = self.service_configs[service_name]["port"] # Try to stop our managed process first if service_name in self.processes: process = self.processes[service_name] if process.poll() is None: # Process is still running try: print(f"๐Ÿ›‘ Stopping {service_name} service (PID: {process.pid})...") if sys.platform == "win32": process.send_signal(signal.CTRL_BREAK_EVENT) else: os.killpg(os.getpgid(process.pid), signal.SIGTERM) # Wait for graceful shutdown try: process.wait(timeout=10) print(f"โœ… {service_name} service stopped") del self.processes[service_name] return True except subprocess.TimeoutExpired: print(f"โš ๏ธ Force killing {service_name} service...") process.kill() del self.processes[service_name] return True except Exception as e: print(f"โŒ Error stopping {service_name} service: {e}") return False # Try to find and stop any process using the port try: for proc in psutil.process_iter(['pid', 'name', 'connections']): try: for conn in proc.info['connections'] or []: if conn.laddr.port == port: print(f"๐Ÿ›‘ Found process using port {port} (PID: {proc.pid})") proc.terminate() try: proc.wait(timeout=5) print(f"โœ… Process {proc.pid} terminated") return True except psutil.TimeoutExpired: proc.kill() print(f"โœ… Process {proc.pid} killed") return True except (psutil.NoSuchProcess, psutil.AccessDenied): continue except Exception as e: print(f"โŒ Error finding process on port {port}: {e}") print(f"โš ๏ธ No running {service_name} service found") return False def stop_all_services(self): """Stop all managed services""" print("๐Ÿ›‘ Stopping all services...") for service_name in self.service_configs.keys(): self.stop_service(service_name) async def get_service_status(self, service_name: str) -> Dict: """Get detailed service status""" if service_name not in self.service_configs: return {"status": "unknown", "error": "Unknown service"} service = self.service_configs[service_name] port = service["port"] status = { "name": service_name, "description": service["description"], "port": port, "script": service["script"], "managed_process": False, "port_in_use": self.is_port_in_use(port), "health_check": False, "health_data": None } # Check if we have a managed process if service_name in self.processes: process = self.processes[service_name] if process.poll() is None: status["managed_process"] = True status["pid"] = process.pid try: proc = psutil.Process(process.pid) status["cpu_percent"] = proc.cpu_percent() status["memory_mb"] = proc.memory_info().rss / 1024 / 1024 status["create_time"] = proc.create_time() status["uptime"] = time.time() - proc.create_time() except: pass # Check health endpoint health_ok, health_data = await self.check_service_health(service_name) status["health_check"] = health_ok status["health_data"] = health_data return status async def status_all_services(self): """Show status of all services""" self.print_header("Service Status Overview") for service_name in self.service_configs.keys(): status = await self.get_service_status(service_name) print(f"\n๐Ÿ“Š {service_name.upper()} Service") print(f" Port: {status['port']}") print(f" Script: {status['script']}") if status["managed_process"]: print(f" โœ… Managed process running (PID: {status.get('pid', 'unknown')})") if 'uptime' in status: uptime_str = f"{status['uptime']:.0f} seconds" print(f" โฑ๏ธ Uptime: {uptime_str}") if 'cpu_percent' in status: print(f" ๐Ÿ’ป CPU: {status['cpu_percent']:.1f}%") if 'memory_mb' in status: print(f" ๐Ÿง  Memory: {status['memory_mb']:.1f} MB") elif status["port_in_use"]: print(f" โš ๏ธ Port in use (external process)") else: print(f" โŒ Not running") if status["health_check"]: print(f" โœ… Health check: OK") if status["health_data"]: health = status["health_data"] if isinstance(health, dict) and "status" in health: print(f" Status: {health['status']}") else: print(f" โŒ Health check: Failed") if status["health_data"] and "error" in status["health_data"]: print(f" Error: {status['health_data']['error']}") async def test_service(self, service_name: str): """Test a specific service""" if service_name not in self.service_configs: print(f"โŒ Unknown service: {service_name}") return self.print_header(f"Testing {service_name.upper()} Service") status = await self.get_service_status(service_name) # Basic status if not status["port_in_use"]: print("โŒ Service is not running") return if not status["health_check"]: print("โŒ Health check failed") if status["health_data"]: print(f" Error: {status['health_data']}") return print("โœ… Service is running and healthy") # Service-specific tests port = status["port"] if service_name == "ner": await self.test_ner_service(port) elif service_name == "ocr": await self.test_ocr_service(port) elif service_name == "rag": await self.test_rag_service(port) elif service_name == "unified": await self.test_unified_service(port) async def test_ner_service(self, port: int): """Test NER service functionality""" print("\n๐Ÿงช Testing NER functionality...") try: test_data = { "text": "John Smith works at Microsoft in Seattle.", "extract_relationships": True, "include_embeddings": False, "generate_graph_files": False } async with httpx.AsyncClient() as client: response = await client.post( f"http://localhost:{port}/analyze/text", json=test_data, timeout=30.0 ) if response.status_code == 200: result = response.json() if result.get("success"): entities = result.get("entities", []) relationships = result.get("relationships", []) print(f" โœ… NER analysis successful") print(f" ๐Ÿ“Š Found {len(entities)} entities, {len(relationships)} relationships") else: print(f" โŒ NER analysis failed: {result.get('error', 'Unknown error')}") else: print(f" โŒ NER test failed: HTTP {response.status_code}") except Exception as e: print(f" โŒ NER test error: {e}") async def test_ocr_service(self, port: int): """Test OCR service functionality""" print("\n๐Ÿงช Testing OCR functionality...") try: async with httpx.AsyncClient() as client: # Test health endpoint (OCR doesn't have complex test without files) response = await client.get(f"http://localhost:{port}/health") if response.status_code == 200: print(" โœ… OCR service is responsive") else: print(f" โŒ OCR test failed: HTTP {response.status_code}") except Exception as e: print(f" โŒ OCR test error: {e}") async def test_rag_service(self, port: int): """Test RAG service functionality""" print("\n๐Ÿงช Testing RAG functionality...") try: async with httpx.AsyncClient() as client: # Test document listing response = await client.get(f"http://localhost:{port}/documents?limit=5") if response.status_code == 200: result = response.json() documents = result.get("documents", []) print(f" โœ… RAG service is responsive") print(f" ๐Ÿ“Š Found {len(documents)} documents in database") else: print(f" โŒ RAG test failed: HTTP {response.status_code}") except Exception as e: print(f" โŒ RAG test error: {e}") async def test_unified_service(self, port: int): """Test unified service functionality""" print("\n๐Ÿงช Testing Unified functionality...") try: async with httpx.AsyncClient() as client: # Test service discovery response = await client.get(f"http://localhost:{port}/services") if response.status_code == 200: result = response.json() services = result.get("services", {}) print(f" โœ… Unified service is responsive") print(f" ๐Ÿ“Š Discovered {len(services)} services") else: print(f" โŒ Unified test failed: HTTP {response.status_code}") except Exception as e: print(f" โŒ Unified test error: {e}") def list_services(self): """List all available services""" self.print_header("Available Services") for service_name, service in self.service_configs.items(): print(f"\n๐Ÿ“ {service_name}") print(f" Description: {service['description']}") print(f" Script: {service['script']}") print(f" Port: {service['port']}") print(f" URL: http://localhost:{service['port']}") async def main(): """Main function with command line interface""" parser = argparse.ArgumentParser( description="Service Management Tool for Unified AI Services", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python manage_services.py start ner # Start NER service python manage_services.py stop all # Stop all services python manage_services.py status # Show status of all services python manage_services.py test rag # Test RAG service python manage_services.py list # List available services """ ) parser.add_argument( "action", choices=["start", "stop", "restart", "status", "test", "list"], help="Action to perform" ) parser.add_argument( "service", nargs="?", choices=["ner", "ocr", "rag", "unified", "all"], help="Service to act on (use 'all' for all services)" ) args = parser.parse_args() manager = ServiceManager() # Handle actions that don't require a service argument if args.action == "list": manager.list_services() return if args.action == "status": await manager.status_all_services() return # Validate service argument for other actions if not args.service: print("โŒ Service argument is required for this action") parser.print_help() return # Handle service-specific actions if args.action == "start": if args.service == "all": # Start services in dependency order services_order = ["ocr", "rag", "ner", "unified"] for service in services_order: success = manager.start_service(service) if success: # Wait a moment between services time.sleep(3) else: print(f"โš ๏ธ Failed to start {service}, continuing with other services...") else: manager.start_service(args.service) elif args.action == "stop": if args.service == "all": manager.stop_all_services() else: manager.stop_service(args.service) elif args.action == "restart": if args.service == "all": print("๐Ÿ”„ Restarting all services...") manager.stop_all_services() time.sleep(2) services_order = ["ocr", "rag", "ner", "unified"] for service in services_order: manager.start_service(service) time.sleep(3) else: print(f"๐Ÿ”„ Restarting {args.service} service...") manager.stop_service(args.service) time.sleep(2) manager.start_service(args.service) elif args.action == "test": if args.service == "all": for service_name in manager.service_configs.keys(): await manager.test_service(service_name) print() # Add spacing between tests else: await manager.test_service(args.service) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n๐Ÿ›‘ Operation cancelled by user") except Exception as e: print(f"\nโŒ Error: {e}") sys.exit(1)