|
|
|
"""
|
|
Service Management Tool for Unified AI Services
|
|
Helps start, stop, monitor, and troubleshoot individual services
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import signal
|
|
import subprocess
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import argparse
|
|
|
|
import httpx
|
|
import psutil
|
|
|
|
|
|
try:
|
|
from configs import get_config, validate_environment
|
|
config = get_config()
|
|
except ImportError:
|
|
print("β οΈ Could not import configs. Using default values.")
|
|
config = None
|
|
|
|
class ServiceManager:
|
|
"""Manages individual services for development and troubleshooting"""
|
|
|
|
def __init__(self):
|
|
self.processes: Dict[str, subprocess.Popen] = {}
|
|
self.service_configs = {
|
|
"ner": {
|
|
"script": "services/ner_service.py",
|
|
"port": 8500,
|
|
"description": "Named Entity Recognition with relationship extraction"
|
|
},
|
|
"ocr": {
|
|
"script": "services/ocr_service.py",
|
|
"port": 8400,
|
|
"description": "Optical Character Recognition with document processing"
|
|
},
|
|
"rag": {
|
|
"script": "services/rag_service.py",
|
|
"port": 8401,
|
|
"description": "Retrieval-Augmented Generation with vector search"
|
|
},
|
|
"unified": {
|
|
"script": "app.py",
|
|
"port": 8000,
|
|
"description": "Unified application coordinating all services"
|
|
}
|
|
}
|
|
|
|
|
|
if config:
|
|
self.service_configs["ner"]["port"] = config.ner.PORT
|
|
self.service_configs["ocr"]["port"] = config.ocr.PORT
|
|
self.service_configs["rag"]["port"] = config.rag.PORT
|
|
self.service_configs["unified"]["port"] = config.MAIN_PORT
|
|
|
|
def print_header(self, title: str):
|
|
"""Print formatted header"""
|
|
print("\n" + "=" * 60)
|
|
print(f" {title}")
|
|
print("=" * 60)
|
|
|
|
def print_service_info(self, service_name: str):
|
|
"""Print service information"""
|
|
if service_name not in self.service_configs:
|
|
return
|
|
|
|
service = self.service_configs[service_name]
|
|
print(f"π {service_name.upper()} Service")
|
|
print(f" Description: {service['description']}")
|
|
print(f" Script: {service['script']}")
|
|
print(f" Port: {service['port']}")
|
|
print(f" URL: http://localhost:{service['port']}")
|
|
|
|
def is_port_in_use(self, port: int) -> bool:
|
|
"""Check if port is in use"""
|
|
try:
|
|
for conn in psutil.net_connections():
|
|
if conn.laddr.port == port:
|
|
return True
|
|
return False
|
|
except:
|
|
return False
|
|
|
|
async def check_service_health(self, service_name: str) -> Tuple[bool, Optional[Dict]]:
|
|
"""Check service health"""
|
|
if service_name not in self.service_configs:
|
|
return False, None
|
|
|
|
port = self.service_configs[service_name]["port"]
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"http://localhost:{port}/health",
|
|
timeout=5.0
|
|
)
|
|
if response.status_code == 200:
|
|
return True, response.json()
|
|
else:
|
|
return False, {"error": f"HTTP {response.status_code}"}
|
|
except Exception as e:
|
|
return False, {"error": str(e)}
|
|
|
|
def start_service(self, service_name: str) -> bool:
|
|
"""Start a specific service"""
|
|
if service_name not in self.service_configs:
|
|
print(f"β Unknown service: {service_name}")
|
|
return False
|
|
|
|
service = self.service_configs[service_name]
|
|
script_path = service["script"]
|
|
port = service["port"]
|
|
|
|
|
|
if not Path(script_path).exists():
|
|
print(f"β Service script not found: {script_path}")
|
|
return False
|
|
|
|
|
|
if self.is_port_in_use(port):
|
|
print(f"β οΈ Port {port} is already in use. Service may already be running.")
|
|
return False
|
|
|
|
|
|
if service_name in self.processes:
|
|
process = self.processes[service_name]
|
|
if process.poll() is None:
|
|
print(f"β οΈ {service_name} service is already running (PID: {process.pid})")
|
|
return False
|
|
|
|
try:
|
|
print(f"π Starting {service_name} service...")
|
|
print(f" Script: {script_path}")
|
|
print(f" Port: {port}")
|
|
|
|
|
|
if sys.platform == "win32":
|
|
process = subprocess.Popen([
|
|
sys.executable, script_path
|
|
], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
|
|
else:
|
|
process = subprocess.Popen([
|
|
sys.executable, script_path
|
|
], preexec_fn=os.setsid)
|
|
|
|
self.processes[service_name] = process
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
if process.poll() is None:
|
|
print(f"β
{service_name} service started successfully (PID: {process.pid})")
|
|
return True
|
|
else:
|
|
print(f"β {service_name} service failed to start")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"β Failed to start {service_name} service: {e}")
|
|
return False
|
|
|
|
def stop_service(self, service_name: str) -> bool:
|
|
"""Stop a specific service"""
|
|
if service_name not in self.service_configs:
|
|
print(f"β Unknown service: {service_name}")
|
|
return False
|
|
|
|
port = self.service_configs[service_name]["port"]
|
|
|
|
|
|
if service_name in self.processes:
|
|
process = self.processes[service_name]
|
|
if process.poll() is None:
|
|
try:
|
|
print(f"π Stopping {service_name} service (PID: {process.pid})...")
|
|
|
|
if sys.platform == "win32":
|
|
process.send_signal(signal.CTRL_BREAK_EVENT)
|
|
else:
|
|
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
|
|
|
|
|
|
try:
|
|
process.wait(timeout=10)
|
|
print(f"β
{service_name} service stopped")
|
|
del self.processes[service_name]
|
|
return True
|
|
except subprocess.TimeoutExpired:
|
|
print(f"β οΈ Force killing {service_name} service...")
|
|
process.kill()
|
|
del self.processes[service_name]
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"β Error stopping {service_name} service: {e}")
|
|
return False
|
|
|
|
|
|
try:
|
|
for proc in psutil.process_iter(['pid', 'name', 'connections']):
|
|
try:
|
|
for conn in proc.info['connections'] or []:
|
|
if conn.laddr.port == port:
|
|
print(f"π Found process using port {port} (PID: {proc.pid})")
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=5)
|
|
print(f"β
Process {proc.pid} terminated")
|
|
return True
|
|
except psutil.TimeoutExpired:
|
|
proc.kill()
|
|
print(f"β
Process {proc.pid} killed")
|
|
return True
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
except Exception as e:
|
|
print(f"β Error finding process on port {port}: {e}")
|
|
|
|
print(f"β οΈ No running {service_name} service found")
|
|
return False
|
|
|
|
def stop_all_services(self):
|
|
"""Stop all managed services"""
|
|
print("π Stopping all services...")
|
|
|
|
for service_name in self.service_configs.keys():
|
|
self.stop_service(service_name)
|
|
|
|
async def get_service_status(self, service_name: str) -> Dict:
|
|
"""Get detailed service status"""
|
|
if service_name not in self.service_configs:
|
|
return {"status": "unknown", "error": "Unknown service"}
|
|
|
|
service = self.service_configs[service_name]
|
|
port = service["port"]
|
|
|
|
status = {
|
|
"name": service_name,
|
|
"description": service["description"],
|
|
"port": port,
|
|
"script": service["script"],
|
|
"managed_process": False,
|
|
"port_in_use": self.is_port_in_use(port),
|
|
"health_check": False,
|
|
"health_data": None
|
|
}
|
|
|
|
|
|
if service_name in self.processes:
|
|
process = self.processes[service_name]
|
|
if process.poll() is None:
|
|
status["managed_process"] = True
|
|
status["pid"] = process.pid
|
|
try:
|
|
proc = psutil.Process(process.pid)
|
|
status["cpu_percent"] = proc.cpu_percent()
|
|
status["memory_mb"] = proc.memory_info().rss / 1024 / 1024
|
|
status["create_time"] = proc.create_time()
|
|
status["uptime"] = time.time() - proc.create_time()
|
|
except:
|
|
pass
|
|
|
|
|
|
health_ok, health_data = await self.check_service_health(service_name)
|
|
status["health_check"] = health_ok
|
|
status["health_data"] = health_data
|
|
|
|
return status
|
|
|
|
async def status_all_services(self):
|
|
"""Show status of all services"""
|
|
self.print_header("Service Status Overview")
|
|
|
|
for service_name in self.service_configs.keys():
|
|
status = await self.get_service_status(service_name)
|
|
|
|
print(f"\nπ {service_name.upper()} Service")
|
|
print(f" Port: {status['port']}")
|
|
print(f" Script: {status['script']}")
|
|
|
|
if status["managed_process"]:
|
|
print(f" β
Managed process running (PID: {status.get('pid', 'unknown')})")
|
|
if 'uptime' in status:
|
|
uptime_str = f"{status['uptime']:.0f} seconds"
|
|
print(f" β±οΈ Uptime: {uptime_str}")
|
|
if 'cpu_percent' in status:
|
|
print(f" π» CPU: {status['cpu_percent']:.1f}%")
|
|
if 'memory_mb' in status:
|
|
print(f" π§ Memory: {status['memory_mb']:.1f} MB")
|
|
elif status["port_in_use"]:
|
|
print(f" β οΈ Port in use (external process)")
|
|
else:
|
|
print(f" β Not running")
|
|
|
|
if status["health_check"]:
|
|
print(f" β
Health check: OK")
|
|
if status["health_data"]:
|
|
health = status["health_data"]
|
|
if isinstance(health, dict) and "status" in health:
|
|
print(f" Status: {health['status']}")
|
|
else:
|
|
print(f" β Health check: Failed")
|
|
if status["health_data"] and "error" in status["health_data"]:
|
|
print(f" Error: {status['health_data']['error']}")
|
|
|
|
async def test_service(self, service_name: str):
|
|
"""Test a specific service"""
|
|
if service_name not in self.service_configs:
|
|
print(f"β Unknown service: {service_name}")
|
|
return
|
|
|
|
self.print_header(f"Testing {service_name.upper()} Service")
|
|
|
|
status = await self.get_service_status(service_name)
|
|
|
|
|
|
if not status["port_in_use"]:
|
|
print("β Service is not running")
|
|
return
|
|
|
|
if not status["health_check"]:
|
|
print("β Health check failed")
|
|
if status["health_data"]:
|
|
print(f" Error: {status['health_data']}")
|
|
return
|
|
|
|
print("β
Service is running and healthy")
|
|
|
|
|
|
port = status["port"]
|
|
|
|
if service_name == "ner":
|
|
await self.test_ner_service(port)
|
|
elif service_name == "ocr":
|
|
await self.test_ocr_service(port)
|
|
elif service_name == "rag":
|
|
await self.test_rag_service(port)
|
|
elif service_name == "unified":
|
|
await self.test_unified_service(port)
|
|
|
|
async def test_ner_service(self, port: int):
|
|
"""Test NER service functionality"""
|
|
print("\nπ§ͺ Testing NER functionality...")
|
|
|
|
try:
|
|
test_data = {
|
|
"text": "John Smith works at Microsoft in Seattle.",
|
|
"extract_relationships": True,
|
|
"include_embeddings": False,
|
|
"generate_graph_files": False
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"http://localhost:{port}/analyze/text",
|
|
json=test_data,
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if result.get("success"):
|
|
entities = result.get("entities", [])
|
|
relationships = result.get("relationships", [])
|
|
print(f" β
NER analysis successful")
|
|
print(f" π Found {len(entities)} entities, {len(relationships)} relationships")
|
|
else:
|
|
print(f" β NER analysis failed: {result.get('error', 'Unknown error')}")
|
|
else:
|
|
print(f" β NER test failed: HTTP {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f" β NER test error: {e}")
|
|
|
|
async def test_ocr_service(self, port: int):
|
|
"""Test OCR service functionality"""
|
|
print("\nπ§ͺ Testing OCR functionality...")
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
|
|
response = await client.get(f"http://localhost:{port}/health")
|
|
|
|
if response.status_code == 200:
|
|
print(" β
OCR service is responsive")
|
|
else:
|
|
print(f" β OCR test failed: HTTP {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f" β OCR test error: {e}")
|
|
|
|
async def test_rag_service(self, port: int):
|
|
"""Test RAG service functionality"""
|
|
print("\nπ§ͺ Testing RAG functionality...")
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
|
|
response = await client.get(f"http://localhost:{port}/documents?limit=5")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
documents = result.get("documents", [])
|
|
print(f" β
RAG service is responsive")
|
|
print(f" π Found {len(documents)} documents in database")
|
|
else:
|
|
print(f" β RAG test failed: HTTP {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f" β RAG test error: {e}")
|
|
|
|
async def test_unified_service(self, port: int):
|
|
"""Test unified service functionality"""
|
|
print("\nπ§ͺ Testing Unified functionality...")
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
|
|
response = await client.get(f"http://localhost:{port}/services")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
services = result.get("services", {})
|
|
print(f" β
Unified service is responsive")
|
|
print(f" π Discovered {len(services)} services")
|
|
else:
|
|
print(f" β Unified test failed: HTTP {response.status_code}")
|
|
|
|
except Exception as e:
|
|
print(f" β Unified test error: {e}")
|
|
|
|
def list_services(self):
|
|
"""List all available services"""
|
|
self.print_header("Available Services")
|
|
|
|
for service_name, service in self.service_configs.items():
|
|
print(f"\nπ {service_name}")
|
|
print(f" Description: {service['description']}")
|
|
print(f" Script: {service['script']}")
|
|
print(f" Port: {service['port']}")
|
|
print(f" URL: http://localhost:{service['port']}")
|
|
|
|
async def main():
|
|
"""Main function with command line interface"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Service Management Tool for Unified AI Services",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python manage_services.py start ner # Start NER service
|
|
python manage_services.py stop all # Stop all services
|
|
python manage_services.py status # Show status of all services
|
|
python manage_services.py test rag # Test RAG service
|
|
python manage_services.py list # List available services
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"action",
|
|
choices=["start", "stop", "restart", "status", "test", "list"],
|
|
help="Action to perform"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"service",
|
|
nargs="?",
|
|
choices=["ner", "ocr", "rag", "unified", "all"],
|
|
help="Service to act on (use 'all' for all services)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
manager = ServiceManager()
|
|
|
|
|
|
if args.action == "list":
|
|
manager.list_services()
|
|
return
|
|
|
|
if args.action == "status":
|
|
await manager.status_all_services()
|
|
return
|
|
|
|
|
|
if not args.service:
|
|
print("β Service argument is required for this action")
|
|
parser.print_help()
|
|
return
|
|
|
|
|
|
if args.action == "start":
|
|
if args.service == "all":
|
|
|
|
services_order = ["ocr", "rag", "ner", "unified"]
|
|
for service in services_order:
|
|
success = manager.start_service(service)
|
|
if success:
|
|
|
|
time.sleep(3)
|
|
else:
|
|
print(f"β οΈ Failed to start {service}, continuing with other services...")
|
|
else:
|
|
manager.start_service(args.service)
|
|
|
|
elif args.action == "stop":
|
|
if args.service == "all":
|
|
manager.stop_all_services()
|
|
else:
|
|
manager.stop_service(args.service)
|
|
|
|
elif args.action == "restart":
|
|
if args.service == "all":
|
|
print("π Restarting all services...")
|
|
manager.stop_all_services()
|
|
time.sleep(2)
|
|
services_order = ["ocr", "rag", "ner", "unified"]
|
|
for service in services_order:
|
|
manager.start_service(service)
|
|
time.sleep(3)
|
|
else:
|
|
print(f"π Restarting {args.service} service...")
|
|
manager.stop_service(args.service)
|
|
time.sleep(2)
|
|
manager.start_service(args.service)
|
|
|
|
elif args.action == "test":
|
|
if args.service == "all":
|
|
for service_name in manager.service_configs.keys():
|
|
await manager.test_service(service_name)
|
|
print()
|
|
else:
|
|
await manager.test_service(args.service)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nπ Operation cancelled by user")
|
|
except Exception as e:
|
|
print(f"\nβ Error: {e}")
|
|
sys.exit(1) |