|
|
|
"""
|
|
Unified AI Services Application
|
|
Coordinates NER, OCR, and RAG services with combined workflows
|
|
"""
|
|
|
|
import asyncio
|
|
import subprocess
|
|
import signal
|
|
import sys
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
import tempfile
|
|
import io
|
|
|
|
import httpx
|
|
import uvicorn
|
|
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, BackgroundTasks, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from pydantic import BaseModel, HttpUrl
|
|
import psutil
|
|
|
|
|
|
from configs import get_config, validate_environment
|
|
|
|
|
|
config = get_config()
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
service_processes: Dict[str, subprocess.Popen] = {}
|
|
service_health: Dict[str, bool] = {}
|
|
|
|
|
|
class ServiceStatus(BaseModel):
|
|
name: str
|
|
status: str
|
|
port: int
|
|
health: bool
|
|
uptime: Optional[float] = None
|
|
response_time: Optional[float] = None
|
|
|
|
class UnifiedAnalysisRequest(BaseModel):
|
|
text: Optional[str] = None
|
|
url: Optional[HttpUrl] = None
|
|
extract_relationships: bool = True
|
|
include_embeddings: bool = True
|
|
include_summary: bool = True
|
|
generate_graph_files: bool = True
|
|
export_formats: List[str] = ["neo4j", "json", "graphml"]
|
|
enable_rag_indexing: bool = False
|
|
rag_title: Optional[str] = None
|
|
rag_keywords: Optional[List[str]] = None
|
|
rag_metadata: Optional[Dict[str, Any]] = None
|
|
|
|
class CombinedSearchRequest(BaseModel):
|
|
query: str
|
|
limit: int = 10
|
|
similarity_threshold: float = 0.2
|
|
include_ner_analysis: bool = True
|
|
ner_export_formats: List[str] = ["json"]
|
|
|
|
class UnifiedResponse(BaseModel):
|
|
success: bool
|
|
service_calls: List[str]
|
|
ner_analysis: Optional[Dict[str, Any]] = None
|
|
rag_document: Optional[Dict[str, Any]] = None
|
|
search_results: Optional[Dict[str, Any]] = None
|
|
processing_time: float
|
|
error: Optional[str] = None
|
|
|
|
|
|
async def start_service(service_name: str, script_path: str, port: int) -> bool:
|
|
"""Start a service as a subprocess"""
|
|
try:
|
|
logger.info(f"π Starting {service_name} service on port {port}")
|
|
|
|
|
|
if is_port_in_use(port):
|
|
logger.warning(f"Port {port} is already in use. Assuming {service_name} is already running.")
|
|
return True
|
|
|
|
|
|
if sys.platform == "win32":
|
|
process = subprocess.Popen([
|
|
sys.executable, script_path
|
|
], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
|
|
else:
|
|
process = subprocess.Popen([
|
|
sys.executable, script_path
|
|
], preexec_fn=os.setsid)
|
|
|
|
service_processes[service_name] = process
|
|
|
|
|
|
for i in range(30):
|
|
await asyncio.sleep(1)
|
|
if await check_service_health(service_name, port):
|
|
logger.info(f"β
{service_name} service started successfully")
|
|
service_health[service_name] = True
|
|
return True
|
|
|
|
logger.error(f"β {service_name} service failed to start within timeout")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"β Failed to start {service_name} service: {e}")
|
|
return False
|
|
|
|
def is_port_in_use(port: int) -> bool:
|
|
"""Check if a port is already in use"""
|
|
try:
|
|
for conn in psutil.net_connections():
|
|
if conn.laddr.port == port:
|
|
return True
|
|
return False
|
|
except:
|
|
return False
|
|
|
|
async def check_service_health(service_name: str, port: int) -> bool:
|
|
"""Check if a service is healthy"""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"http://localhost:{port}/health",
|
|
timeout=5.0
|
|
)
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
async def get_service_status(service_name: str, port: int) -> ServiceStatus:
|
|
"""Get detailed status of a service"""
|
|
start_time = time.time()
|
|
health = await check_service_health(service_name, port)
|
|
response_time = time.time() - start_time
|
|
|
|
uptime = None
|
|
if service_name in service_processes:
|
|
process = service_processes[service_name]
|
|
if process.poll() is None:
|
|
try:
|
|
uptime = time.time() - psutil.Process(process.pid).create_time()
|
|
except:
|
|
uptime = None
|
|
|
|
return ServiceStatus(
|
|
name=service_name,
|
|
status="running" if health else "down",
|
|
port=port,
|
|
health=health,
|
|
uptime=uptime,
|
|
response_time=response_time
|
|
)
|
|
|
|
async def stop_all_services():
|
|
"""Stop all managed services"""
|
|
logger.info("π Stopping all services...")
|
|
|
|
for service_name, process in service_processes.items():
|
|
try:
|
|
if process.poll() is None:
|
|
logger.info(f"Stopping {service_name}...")
|
|
|
|
if sys.platform == "win32":
|
|
process.send_signal(signal.CTRL_BREAK_EVENT)
|
|
else:
|
|
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
|
|
|
|
|
|
try:
|
|
process.wait(timeout=10)
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"Force killing {service_name}")
|
|
process.kill()
|
|
|
|
logger.info(f"β
{service_name} stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping {service_name}: {e}")
|
|
|
|
|
|
async def call_ner_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
|
|
"""Call NER service endpoint"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.NER_SERVICE_URL}{endpoint}"
|
|
response = await client.request(method, url, **kwargs)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise HTTPException(status_code=response.status_code, detail=response.text)
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"NER service unavailable: {e}")
|
|
|
|
async def call_ocr_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
|
|
"""Call OCR service endpoint"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.OCR_SERVICE_URL}{endpoint}"
|
|
response = await client.request(method, url, **kwargs)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise HTTPException(status_code=response.status_code, detail=response.text)
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"OCR service unavailable: {e}")
|
|
|
|
async def call_rag_service(endpoint: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
|
|
"""Call RAG service endpoint"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.RAG_SERVICE_URL}{endpoint}"
|
|
response = await client.request(method, url, **kwargs)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
raise HTTPException(status_code=response.status_code, detail=response.text)
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"RAG service unavailable: {e}")
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan management"""
|
|
logger.info("π Starting Unified AI Services Application")
|
|
|
|
|
|
config.print_configuration_summary()
|
|
|
|
|
|
if not validate_environment():
|
|
logger.error("β Environment validation failed. Please check your configuration.")
|
|
raise RuntimeError("Invalid environment configuration")
|
|
|
|
|
|
service_definitions = [
|
|
("ocr", "services/ocr_service.py", config.ocr.PORT),
|
|
("rag", "services/rag_service.py", config.rag.PORT),
|
|
("ner", "services/ner_service.py", config.ner.PORT)
|
|
]
|
|
|
|
|
|
started_services = []
|
|
for service_name, script_path, port in service_definitions:
|
|
if os.path.exists(script_path):
|
|
success = await start_service(service_name, script_path, port)
|
|
if success:
|
|
started_services.append(service_name)
|
|
else:
|
|
logger.error(f"Failed to start {service_name} service")
|
|
else:
|
|
logger.warning(f"Service script not found: {script_path}")
|
|
|
|
if len(started_services) == 0:
|
|
logger.error("β No services could be started")
|
|
raise RuntimeError("Failed to start any services")
|
|
|
|
logger.info(f"β
Started {len(started_services)} services: {', '.join(started_services)}")
|
|
|
|
|
|
yield
|
|
|
|
|
|
await stop_all_services()
|
|
logger.info("π Unified AI Services Application shutdown complete")
|
|
|
|
|
|
app = FastAPI(
|
|
title="Unified AI Services",
|
|
description="Coordinated NER, OCR, and RAG services with combined workflows",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
|
|
allowed_origins = config.ner.ALLOWED_ORIGINS
|
|
if allowed_origins != "*":
|
|
try:
|
|
allowed_origins = json.loads(allowed_origins)
|
|
except:
|
|
allowed_origins = ["*"]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=allowed_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"message": "Unified AI Services",
|
|
"version": "1.0.0",
|
|
"services": {
|
|
"ner": f"{config.NER_SERVICE_URL}",
|
|
"ocr": f"{config.OCR_SERVICE_URL}",
|
|
"rag": f"{config.RAG_SERVICE_URL}"
|
|
},
|
|
"unified_endpoints": {
|
|
"status": "/status",
|
|
"analyze": "/analyze",
|
|
"search": "/search",
|
|
"combined": "/combined/*"
|
|
}
|
|
}
|
|
|
|
@app.get("/health")
|
|
async def unified_health():
|
|
"""Unified health check for all services"""
|
|
services = [
|
|
("ner", config.ner.PORT),
|
|
("ocr", config.ocr.PORT),
|
|
("rag", config.rag.PORT)
|
|
]
|
|
|
|
service_statuses = []
|
|
overall_healthy = True
|
|
|
|
for service_name, port in services:
|
|
status = await get_service_status(service_name, port)
|
|
service_statuses.append(status.dict())
|
|
if not status.health:
|
|
overall_healthy = False
|
|
|
|
return {
|
|
"status": "healthy" if overall_healthy else "degraded",
|
|
"services": service_statuses,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"configuration": {
|
|
"ner_url": config.NER_SERVICE_URL,
|
|
"ocr_url": config.OCR_SERVICE_URL,
|
|
"rag_url": config.RAG_SERVICE_URL
|
|
}
|
|
}
|
|
|
|
@app.get("/status")
|
|
async def detailed_status():
|
|
"""Detailed status of all services"""
|
|
services = [
|
|
("ner", config.ner.PORT),
|
|
("ocr", config.ocr.PORT),
|
|
("rag", config.rag.PORT)
|
|
]
|
|
|
|
detailed_statuses = {}
|
|
|
|
for service_name, port in services:
|
|
try:
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(f"http://localhost:{port}/health", timeout=10.0)
|
|
if response.status_code == 200:
|
|
detailed_statuses[service_name] = response.json()
|
|
else:
|
|
detailed_statuses[service_name] = {"status": "error", "error": f"HTTP {response.status_code}"}
|
|
except Exception as e:
|
|
detailed_statuses[service_name] = {"status": "unreachable", "error": str(e)}
|
|
|
|
return {
|
|
"unified_app": {
|
|
"status": "running",
|
|
"port": config.MAIN_PORT,
|
|
"uptime": time.time() - start_time if 'start_time' in globals() else 0
|
|
},
|
|
"services": detailed_statuses,
|
|
"configuration_valid": validate_environment()
|
|
}
|
|
|
|
|
|
@app.post("/analyze/unified")
|
|
async def unified_analysis(request: UnifiedAnalysisRequest):
|
|
"""Unified analysis combining NER and optional RAG indexing"""
|
|
start_time = time.time()
|
|
service_calls = []
|
|
|
|
try:
|
|
|
|
ner_data = {
|
|
"text": request.text,
|
|
"url": str(request.url) if request.url else None,
|
|
"extract_relationships": request.extract_relationships,
|
|
"include_embeddings": request.include_embeddings,
|
|
"include_summary": request.include_summary,
|
|
"generate_graph_files": request.generate_graph_files,
|
|
"export_formats": request.export_formats
|
|
}
|
|
|
|
|
|
ner_data = {k: v for k, v in ner_data.items() if v is not None}
|
|
|
|
if request.text:
|
|
ner_result = await call_ner_service("/analyze/text", "POST", json=ner_data)
|
|
service_calls.append("ner_text")
|
|
elif request.url:
|
|
ner_result = await call_ner_service("/analyze/url", "POST", json=ner_data)
|
|
service_calls.append("ner_url")
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Either text or url must be provided")
|
|
|
|
|
|
rag_result = None
|
|
if request.enable_rag_indexing and ner_result.get("success"):
|
|
try:
|
|
rag_data = {
|
|
"title": request.rag_title or f"NER Analysis {ner_result.get('analysis_id', 'unknown')}",
|
|
"keywords": request.rag_keywords or ner_result.get("keywords", []),
|
|
"metadata": {
|
|
**(request.rag_metadata or {}),
|
|
"ner_analysis_id": ner_result.get("analysis_id"),
|
|
"entity_count": len(ner_result.get("entities", [])),
|
|
"relationship_count": len(ner_result.get("relationships", []))
|
|
}
|
|
}
|
|
|
|
if request.text:
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
|
f.write(request.text)
|
|
temp_path = f.name
|
|
|
|
try:
|
|
with open(temp_path, 'rb') as f:
|
|
files = {"file": ("ner_analysis.txt", f, "text/plain")}
|
|
form_data = {
|
|
"title": rag_data["title"],
|
|
"keywords": json.dumps(rag_data["keywords"]),
|
|
"metadata": json.dumps(rag_data["metadata"])
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
response = await client.post(
|
|
f"{config.RAG_SERVICE_URL}/documents/upload",
|
|
files=files,
|
|
data=form_data
|
|
)
|
|
if response.status_code == 200:
|
|
rag_result = response.json()
|
|
service_calls.append("rag_upload")
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
elif request.url:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
response = await client.post(
|
|
f"{config.RAG_SERVICE_URL}/documents/url",
|
|
json={
|
|
"url": str(request.url),
|
|
**rag_data,
|
|
"extract_images": True
|
|
}
|
|
)
|
|
if response.status_code == 200:
|
|
rag_result = response.json()
|
|
service_calls.append("rag_url")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"RAG indexing failed: {e}")
|
|
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
return UnifiedResponse(
|
|
success=True,
|
|
service_calls=service_calls,
|
|
ner_analysis=ner_result,
|
|
rag_document=rag_result,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
logger.error(f"Unified analysis failed: {e}")
|
|
|
|
return UnifiedResponse(
|
|
success=False,
|
|
service_calls=service_calls,
|
|
processing_time=processing_time,
|
|
error=str(e)
|
|
)
|
|
|
|
@app.post("/search/combined")
|
|
async def combined_search(request: CombinedSearchRequest):
|
|
"""Combined search using RAG with optional NER analysis of results"""
|
|
start_time = time.time()
|
|
service_calls = []
|
|
|
|
try:
|
|
|
|
search_data = {
|
|
"query": request.query,
|
|
"limit": request.limit,
|
|
"similarity_threshold": request.similarity_threshold
|
|
}
|
|
|
|
search_result = await call_rag_service("/search", "POST", json=search_data)
|
|
service_calls.append("rag_search")
|
|
|
|
|
|
ner_results = []
|
|
if request.include_ner_analysis and search_result.get("results"):
|
|
for i, result in enumerate(search_result["results"][:3]):
|
|
chunk_content = result.get("chunk", {}).get("content", "")
|
|
if chunk_content:
|
|
try:
|
|
ner_data = {
|
|
"text": chunk_content,
|
|
"extract_relationships": True,
|
|
"include_embeddings": False,
|
|
"include_summary": False,
|
|
"generate_graph_files": False,
|
|
"export_formats": request.ner_export_formats
|
|
}
|
|
|
|
ner_result = await call_ner_service("/analyze/text", "POST", json=ner_data)
|
|
ner_results.append({
|
|
"result_index": i,
|
|
"ner_analysis": ner_result
|
|
})
|
|
service_calls.append(f"ner_text_{i}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"NER analysis failed for result {i}: {e}")
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
return UnifiedResponse(
|
|
success=True,
|
|
service_calls=service_calls,
|
|
search_results={
|
|
**search_result,
|
|
"ner_analyses": ner_results
|
|
},
|
|
processing_time=processing_time
|
|
)
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
logger.error(f"Combined search failed: {e}")
|
|
|
|
return UnifiedResponse(
|
|
success=False,
|
|
service_calls=service_calls,
|
|
processing_time=processing_time,
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
@app.api_route("/ner/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
|
|
async def ner_proxy(path: str, request):
|
|
"""Proxy requests to NER service"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.NER_SERVICE_URL}/{path}"
|
|
|
|
|
|
if request.method == "GET":
|
|
response = await client.get(url, params=request.query_params)
|
|
else:
|
|
|
|
content_type = request.headers.get("content-type", "")
|
|
|
|
if "multipart/form-data" in content_type:
|
|
|
|
form = await request.form()
|
|
files = {}
|
|
data = {}
|
|
|
|
for key, value in form.items():
|
|
if hasattr(value, 'read'):
|
|
files[key] = (value.filename, await value.read(), value.content_type)
|
|
else:
|
|
data[key] = value
|
|
|
|
response = await client.request(request.method, url, files=files, data=data)
|
|
else:
|
|
|
|
body = await request.body()
|
|
response = await client.request(
|
|
request.method,
|
|
url,
|
|
content=body,
|
|
headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
|
|
)
|
|
|
|
|
|
return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"NER service unavailable: {e}")
|
|
|
|
@app.api_route("/ocr/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
|
|
async def ocr_proxy(path: str, request):
|
|
"""Proxy requests to OCR service"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.OCR_SERVICE_URL}/{path}"
|
|
|
|
|
|
if request.method == "GET":
|
|
response = await client.get(url, params=request.query_params)
|
|
else:
|
|
|
|
content_type = request.headers.get("content-type", "")
|
|
|
|
if "multipart/form-data" in content_type:
|
|
|
|
form = await request.form()
|
|
files = {}
|
|
data = {}
|
|
|
|
for key, value in form.items():
|
|
if hasattr(value, 'read'):
|
|
files[key] = (value.filename, await value.read(), value.content_type)
|
|
else:
|
|
data[key] = value
|
|
|
|
response = await client.request(request.method, url, files=files, data=data)
|
|
else:
|
|
|
|
body = await request.body()
|
|
response = await client.request(
|
|
request.method,
|
|
url,
|
|
content=body,
|
|
headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
|
|
)
|
|
|
|
|
|
return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"OCR service unavailable: {e}")
|
|
|
|
@app.api_route("/rag/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
|
|
async def rag_proxy(path: str, request):
|
|
"""Proxy requests to RAG service"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
url = f"{config.RAG_SERVICE_URL}/{path}"
|
|
|
|
|
|
if request.method == "GET":
|
|
response = await client.get(url, params=request.query_params)
|
|
else:
|
|
|
|
content_type = request.headers.get("content-type", "")
|
|
|
|
if "multipart/form-data" in content_type:
|
|
|
|
form = await request.form()
|
|
files = {}
|
|
data = {}
|
|
|
|
for key, value in form.items():
|
|
if hasattr(value, 'read'):
|
|
files[key] = (value.filename, await value.read(), value.content_type)
|
|
else:
|
|
data[key] = value
|
|
|
|
response = await client.request(request.method, url, files=files, data=data)
|
|
else:
|
|
|
|
body = await request.body()
|
|
response = await client.request(
|
|
request.method,
|
|
url,
|
|
content=body,
|
|
headers={k: v for k, v in request.headers.items() if k.lower() != "host"}
|
|
)
|
|
|
|
|
|
return response.json() if response.headers.get("content-type", "").startswith("application/json") else response.text
|
|
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=503, detail=f"RAG service unavailable: {e}")
|
|
|
|
|
|
@app.get("/analyze/text")
|
|
@app.post("/analyze/text")
|
|
async def analyze_text_direct(request=None):
|
|
"""Direct access to NER text analysis"""
|
|
if request:
|
|
return await call_ner_service("/analyze/text", "POST", json=await request.json())
|
|
else:
|
|
return {"message": "Use POST method with text data"}
|
|
|
|
@app.get("/documents")
|
|
async def list_documents():
|
|
"""Direct access to RAG document listing"""
|
|
return await call_rag_service("/documents", "GET")
|
|
|
|
@app.post("/search")
|
|
async def search_direct(request):
|
|
"""Direct access to RAG search"""
|
|
return await call_rag_service("/search", "POST", json=await request.json())
|
|
|
|
|
|
@app.get("/services")
|
|
async def list_services():
|
|
"""List all available services and their endpoints"""
|
|
return {
|
|
"services": {
|
|
"ner": {
|
|
"url": config.NER_SERVICE_URL,
|
|
"description": "Named Entity Recognition with relationship extraction",
|
|
"endpoints": [
|
|
"/analyze/text", "/analyze/file", "/analyze/url", "/analyze/multi",
|
|
"/download/{analysis_id}/{file_type}", "/statistics", "/entity-types", "/relationship-types"
|
|
]
|
|
},
|
|
"ocr": {
|
|
"url": config.OCR_SERVICE_URL,
|
|
"description": "Optical Character Recognition with document processing",
|
|
"endpoints": [
|
|
"/ocr/upload", "/ocr/url", "/ocr/analyze"
|
|
]
|
|
},
|
|
"rag": {
|
|
"url": config.RAG_SERVICE_URL,
|
|
"description": "Retrieval-Augmented Generation with vector search",
|
|
"endpoints": [
|
|
"/documents/upload", "/documents/url", "/search", "/documents", "/documents/{id}"
|
|
]
|
|
}
|
|
},
|
|
"unified": {
|
|
"url": f"http://localhost:{config.MAIN_PORT}",
|
|
"description": "Unified interface for combined workflows",
|
|
"endpoints": [
|
|
"/analyze/unified", "/search/combined", "/ner/*", "/ocr/*", "/rag/*"
|
|
]
|
|
}
|
|
}
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle shutdown signals"""
|
|
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
|
|
asyncio.create_task(stop_all_services())
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
if __name__ == "__main__":
|
|
print("π Starting Unified AI Services Application")
|
|
print("=" * 50)
|
|
|
|
|
|
if not validate_environment():
|
|
print("β Configuration validation failed!")
|
|
print("Please check your .env file and ensure all required services are configured.")
|
|
sys.exit(1)
|
|
|
|
print(f"π Main application will run on: http://{config.MAIN_HOST}:{config.MAIN_PORT}")
|
|
print(f"π Services will be started automatically:")
|
|
print(f" β’ NER Service: http://localhost:{config.ner.PORT}")
|
|
print(f" β’ OCR Service: http://localhost:{config.ocr.PORT}")
|
|
print(f" β’ RAG Service: http://localhost:{config.rag.PORT}")
|
|
print("")
|
|
print("π― Available endpoints:")
|
|
print(" β’ Main API: /")
|
|
print(" β’ Health Check: /health")
|
|
print(" β’ Unified Analysis: /analyze/unified")
|
|
print(" β’ Combined Search: /search/combined")
|
|
print(" β’ Service Proxies: /ner/*, /ocr/*, /rag/*")
|
|
print("")
|
|
print("π API Documentation: /docs")
|
|
print("")
|
|
|
|
try:
|
|
uvicorn.run(
|
|
"app:app",
|
|
host=config.MAIN_HOST,
|
|
port=config.MAIN_PORT,
|
|
reload=config.ner.DEBUG,
|
|
log_level="info"
|
|
)
|
|
except KeyboardInterrupt:
|
|
print("\nπ Shutting down gracefully...")
|
|
finally:
|
|
|
|
pass |