SB-PoC / manage_services.py
Chirapath's picture
First draft coding project
963ae98 verified
#!/usr/bin/env python3
"""
Service Management Tool for Unified AI Services
Helps start, stop, monitor, and troubleshoot individual services
"""
import os
import sys
import time
import signal
import subprocess
import asyncio
import json
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import argparse
import httpx
import psutil
# Import configuration if available
try:
from configs import get_config, validate_environment
config = get_config()
except ImportError:
print("⚠️ Could not import configs. Using default values.")
config = None
class ServiceManager:
"""Manages individual services for development and troubleshooting"""
def __init__(self):
self.processes: Dict[str, subprocess.Popen] = {}
self.service_configs = {
"ner": {
"script": "services/ner_service.py",
"port": 8500,
"description": "Named Entity Recognition with relationship extraction"
},
"ocr": {
"script": "services/ocr_service.py",
"port": 8400,
"description": "Optical Character Recognition with document processing"
},
"rag": {
"script": "services/rag_service.py",
"port": 8401,
"description": "Retrieval-Augmented Generation with vector search"
},
"unified": {
"script": "app.py",
"port": 8000,
"description": "Unified application coordinating all services"
}
}
# Update ports from config if available
if config:
self.service_configs["ner"]["port"] = config.ner.PORT
self.service_configs["ocr"]["port"] = config.ocr.PORT
self.service_configs["rag"]["port"] = config.rag.PORT
self.service_configs["unified"]["port"] = config.MAIN_PORT
def print_header(self, title: str):
"""Print formatted header"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def print_service_info(self, service_name: str):
"""Print service information"""
if service_name not in self.service_configs:
return
service = self.service_configs[service_name]
print(f"πŸ“ {service_name.upper()} Service")
print(f" Description: {service['description']}")
print(f" Script: {service['script']}")
print(f" Port: {service['port']}")
print(f" URL: http://localhost:{service['port']}")
def is_port_in_use(self, port: int) -> bool:
"""Check if port is in use"""
try:
for conn in psutil.net_connections():
if conn.laddr.port == port:
return True
return False
except:
return False
async def check_service_health(self, service_name: str) -> Tuple[bool, Optional[Dict]]:
"""Check service health"""
if service_name not in self.service_configs:
return False, None
port = self.service_configs[service_name]["port"]
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://localhost:{port}/health",
timeout=5.0
)
if response.status_code == 200:
return True, response.json()
else:
return False, {"error": f"HTTP {response.status_code}"}
except Exception as e:
return False, {"error": str(e)}
def start_service(self, service_name: str) -> bool:
"""Start a specific service"""
if service_name not in self.service_configs:
print(f"❌ Unknown service: {service_name}")
return False
service = self.service_configs[service_name]
script_path = service["script"]
port = service["port"]
# Check if script exists
if not Path(script_path).exists():
print(f"❌ Service script not found: {script_path}")
return False
# Check if port is already in use
if self.is_port_in_use(port):
print(f"⚠️ Port {port} is already in use. Service may already be running.")
return False
# Check if service is already running in our process list
if service_name in self.processes:
process = self.processes[service_name]
if process.poll() is None: # Process is still running
print(f"⚠️ {service_name} service is already running (PID: {process.pid})")
return False
try:
print(f"πŸš€ Starting {service_name} service...")
print(f" Script: {script_path}")
print(f" Port: {port}")
# Start the service
if sys.platform == "win32":
process = subprocess.Popen([
sys.executable, script_path
], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
else:
process = subprocess.Popen([
sys.executable, script_path
], preexec_fn=os.setsid)
self.processes[service_name] = process
# Wait a moment for startup
time.sleep(2)
# Check if process is still running
if process.poll() is None:
print(f"βœ… {service_name} service started successfully (PID: {process.pid})")
return True
else:
print(f"❌ {service_name} service failed to start")
return False
except Exception as e:
print(f"❌ Failed to start {service_name} service: {e}")
return False
def stop_service(self, service_name: str) -> bool:
"""Stop a specific service"""
if service_name not in self.service_configs:
print(f"❌ Unknown service: {service_name}")
return False
port = self.service_configs[service_name]["port"]
# Try to stop our managed process first
if service_name in self.processes:
process = self.processes[service_name]
if process.poll() is None: # Process is still running
try:
print(f"πŸ›‘ Stopping {service_name} service (PID: {process.pid})...")
if sys.platform == "win32":
process.send_signal(signal.CTRL_BREAK_EVENT)
else:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
# Wait for graceful shutdown
try:
process.wait(timeout=10)
print(f"βœ… {service_name} service stopped")
del self.processes[service_name]
return True
except subprocess.TimeoutExpired:
print(f"⚠️ Force killing {service_name} service...")
process.kill()
del self.processes[service_name]
return True
except Exception as e:
print(f"❌ Error stopping {service_name} service: {e}")
return False
# Try to find and stop any process using the port
try:
for proc in psutil.process_iter(['pid', 'name', 'connections']):
try:
for conn in proc.info['connections'] or []:
if conn.laddr.port == port:
print(f"πŸ›‘ Found process using port {port} (PID: {proc.pid})")
proc.terminate()
try:
proc.wait(timeout=5)
print(f"βœ… Process {proc.pid} terminated")
return True
except psutil.TimeoutExpired:
proc.kill()
print(f"βœ… Process {proc.pid} killed")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception as e:
print(f"❌ Error finding process on port {port}: {e}")
print(f"⚠️ No running {service_name} service found")
return False
def stop_all_services(self):
"""Stop all managed services"""
print("πŸ›‘ Stopping all services...")
for service_name in self.service_configs.keys():
self.stop_service(service_name)
async def get_service_status(self, service_name: str) -> Dict:
"""Get detailed service status"""
if service_name not in self.service_configs:
return {"status": "unknown", "error": "Unknown service"}
service = self.service_configs[service_name]
port = service["port"]
status = {
"name": service_name,
"description": service["description"],
"port": port,
"script": service["script"],
"managed_process": False,
"port_in_use": self.is_port_in_use(port),
"health_check": False,
"health_data": None
}
# Check if we have a managed process
if service_name in self.processes:
process = self.processes[service_name]
if process.poll() is None:
status["managed_process"] = True
status["pid"] = process.pid
try:
proc = psutil.Process(process.pid)
status["cpu_percent"] = proc.cpu_percent()
status["memory_mb"] = proc.memory_info().rss / 1024 / 1024
status["create_time"] = proc.create_time()
status["uptime"] = time.time() - proc.create_time()
except:
pass
# Check health endpoint
health_ok, health_data = await self.check_service_health(service_name)
status["health_check"] = health_ok
status["health_data"] = health_data
return status
async def status_all_services(self):
"""Show status of all services"""
self.print_header("Service Status Overview")
for service_name in self.service_configs.keys():
status = await self.get_service_status(service_name)
print(f"\nπŸ“Š {service_name.upper()} Service")
print(f" Port: {status['port']}")
print(f" Script: {status['script']}")
if status["managed_process"]:
print(f" βœ… Managed process running (PID: {status.get('pid', 'unknown')})")
if 'uptime' in status:
uptime_str = f"{status['uptime']:.0f} seconds"
print(f" ⏱️ Uptime: {uptime_str}")
if 'cpu_percent' in status:
print(f" πŸ’» CPU: {status['cpu_percent']:.1f}%")
if 'memory_mb' in status:
print(f" 🧠 Memory: {status['memory_mb']:.1f} MB")
elif status["port_in_use"]:
print(f" ⚠️ Port in use (external process)")
else:
print(f" ❌ Not running")
if status["health_check"]:
print(f" βœ… Health check: OK")
if status["health_data"]:
health = status["health_data"]
if isinstance(health, dict) and "status" in health:
print(f" Status: {health['status']}")
else:
print(f" ❌ Health check: Failed")
if status["health_data"] and "error" in status["health_data"]:
print(f" Error: {status['health_data']['error']}")
async def test_service(self, service_name: str):
"""Test a specific service"""
if service_name not in self.service_configs:
print(f"❌ Unknown service: {service_name}")
return
self.print_header(f"Testing {service_name.upper()} Service")
status = await self.get_service_status(service_name)
# Basic status
if not status["port_in_use"]:
print("❌ Service is not running")
return
if not status["health_check"]:
print("❌ Health check failed")
if status["health_data"]:
print(f" Error: {status['health_data']}")
return
print("βœ… Service is running and healthy")
# Service-specific tests
port = status["port"]
if service_name == "ner":
await self.test_ner_service(port)
elif service_name == "ocr":
await self.test_ocr_service(port)
elif service_name == "rag":
await self.test_rag_service(port)
elif service_name == "unified":
await self.test_unified_service(port)
async def test_ner_service(self, port: int):
"""Test NER service functionality"""
print("\nπŸ§ͺ Testing NER functionality...")
try:
test_data = {
"text": "John Smith works at Microsoft in Seattle.",
"extract_relationships": True,
"include_embeddings": False,
"generate_graph_files": False
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://localhost:{port}/analyze/text",
json=test_data,
timeout=30.0
)
if response.status_code == 200:
result = response.json()
if result.get("success"):
entities = result.get("entities", [])
relationships = result.get("relationships", [])
print(f" βœ… NER analysis successful")
print(f" πŸ“Š Found {len(entities)} entities, {len(relationships)} relationships")
else:
print(f" ❌ NER analysis failed: {result.get('error', 'Unknown error')}")
else:
print(f" ❌ NER test failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ NER test error: {e}")
async def test_ocr_service(self, port: int):
"""Test OCR service functionality"""
print("\nπŸ§ͺ Testing OCR functionality...")
try:
async with httpx.AsyncClient() as client:
# Test health endpoint (OCR doesn't have complex test without files)
response = await client.get(f"http://localhost:{port}/health")
if response.status_code == 200:
print(" βœ… OCR service is responsive")
else:
print(f" ❌ OCR test failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ OCR test error: {e}")
async def test_rag_service(self, port: int):
"""Test RAG service functionality"""
print("\nπŸ§ͺ Testing RAG functionality...")
try:
async with httpx.AsyncClient() as client:
# Test document listing
response = await client.get(f"http://localhost:{port}/documents?limit=5")
if response.status_code == 200:
result = response.json()
documents = result.get("documents", [])
print(f" βœ… RAG service is responsive")
print(f" πŸ“Š Found {len(documents)} documents in database")
else:
print(f" ❌ RAG test failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ RAG test error: {e}")
async def test_unified_service(self, port: int):
"""Test unified service functionality"""
print("\nπŸ§ͺ Testing Unified functionality...")
try:
async with httpx.AsyncClient() as client:
# Test service discovery
response = await client.get(f"http://localhost:{port}/services")
if response.status_code == 200:
result = response.json()
services = result.get("services", {})
print(f" βœ… Unified service is responsive")
print(f" πŸ“Š Discovered {len(services)} services")
else:
print(f" ❌ Unified test failed: HTTP {response.status_code}")
except Exception as e:
print(f" ❌ Unified test error: {e}")
def list_services(self):
"""List all available services"""
self.print_header("Available Services")
for service_name, service in self.service_configs.items():
print(f"\nπŸ“ {service_name}")
print(f" Description: {service['description']}")
print(f" Script: {service['script']}")
print(f" Port: {service['port']}")
print(f" URL: http://localhost:{service['port']}")
async def main():
"""Main function with command line interface"""
parser = argparse.ArgumentParser(
description="Service Management Tool for Unified AI Services",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python manage_services.py start ner # Start NER service
python manage_services.py stop all # Stop all services
python manage_services.py status # Show status of all services
python manage_services.py test rag # Test RAG service
python manage_services.py list # List available services
"""
)
parser.add_argument(
"action",
choices=["start", "stop", "restart", "status", "test", "list"],
help="Action to perform"
)
parser.add_argument(
"service",
nargs="?",
choices=["ner", "ocr", "rag", "unified", "all"],
help="Service to act on (use 'all' for all services)"
)
args = parser.parse_args()
manager = ServiceManager()
# Handle actions that don't require a service argument
if args.action == "list":
manager.list_services()
return
if args.action == "status":
await manager.status_all_services()
return
# Validate service argument for other actions
if not args.service:
print("❌ Service argument is required for this action")
parser.print_help()
return
# Handle service-specific actions
if args.action == "start":
if args.service == "all":
# Start services in dependency order
services_order = ["ocr", "rag", "ner", "unified"]
for service in services_order:
success = manager.start_service(service)
if success:
# Wait a moment between services
time.sleep(3)
else:
print(f"⚠️ Failed to start {service}, continuing with other services...")
else:
manager.start_service(args.service)
elif args.action == "stop":
if args.service == "all":
manager.stop_all_services()
else:
manager.stop_service(args.service)
elif args.action == "restart":
if args.service == "all":
print("πŸ”„ Restarting all services...")
manager.stop_all_services()
time.sleep(2)
services_order = ["ocr", "rag", "ner", "unified"]
for service in services_order:
manager.start_service(service)
time.sleep(3)
else:
print(f"πŸ”„ Restarting {args.service} service...")
manager.stop_service(args.service)
time.sleep(2)
manager.start_service(args.service)
elif args.action == "test":
if args.service == "all":
for service_name in manager.service_configs.keys():
await manager.test_service(service_name)
print() # Add spacing between tests
else:
await manager.test_service(args.service)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nπŸ›‘ Operation cancelled by user")
except Exception as e:
print(f"\n❌ Error: {e}")
sys.exit(1)