| | """
|
| | Agent HTTP Server for CareFlow Nexus Backend Integration
|
| | Exposes AI agents as HTTP endpoints that the backend can call
|
| | Matches the API contract expected by backend/app/services/agents.py
|
| | """
|
| |
|
| | import asyncio
|
| | import logging
|
| | import sys
|
| | from datetime import datetime
|
| | from typing import Any, Dict, List, Optional
|
| |
|
| | from allocator_agent import BedAllocatorAgent
|
| | from communicator_agent import CommunicatorAgent
|
| | from config import config
|
| | from fastapi import FastAPI, HTTPException
|
| | from fastapi.middleware.cors import CORSMiddleware
|
| | from memory_agent import MemoryAgent
|
| | from pydantic import BaseModel
|
| | from services.firebase_service import FirebaseService
|
| | from services.gemini_service import GeminiService
|
| |
|
| |
|
| | logging.basicConfig(
|
| | level=logging.INFO,
|
| | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
| | handlers=[
|
| | logging.StreamHandler(sys.stdout),
|
| | logging.FileHandler(f"agent_server_{datetime.now().strftime('%Y%m%d')}.log"),
|
| | ],
|
| | )
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | app = FastAPI(
|
| | title="CareFlow AI Agent Server",
|
| | description="AI Agent Microservice for Hospital Bed Management",
|
| | version="1.0.0",
|
| | docs_url="/docs",
|
| | redoc_url="/redoc",
|
| | )
|
| |
|
| |
|
| | app.add_middleware(
|
| | CORSMiddleware,
|
| | allow_origins=["*"],
|
| | allow_credentials=True,
|
| | allow_methods=["*"],
|
| | allow_headers=["*"],
|
| | )
|
| |
|
| |
|
| | firebase_service: Optional[FirebaseService] = None
|
| | gemini_service: Optional[GeminiService] = None
|
| | memory_agent: Optional[MemoryAgent] = None
|
| | bed_allocator_agent: Optional[BedAllocatorAgent] = None
|
| | communicator_agent: Optional[CommunicatorAgent] = None
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class BedAgentRequest(BaseModel):
|
| | """Request model for bed assignment agent"""
|
| |
|
| | patient: Dict[str, Any]
|
| | doctor_input: Dict[str, Any]
|
| | available_beds: List[Dict[str, Any]]
|
| |
|
| |
|
| | class BedAgentResponse(BaseModel):
|
| | """Response model for bed assignment agent"""
|
| |
|
| | recommended_bed_id: Optional[str]
|
| | reason: str
|
| | recommendations: List[Dict[str, Any]]
|
| | confidence: int
|
| |
|
| |
|
| | class CleanerAgentRequest(BaseModel):
|
| | """Request model for cleaner assignment agent"""
|
| |
|
| | bed_id: str
|
| | available_cleaners: List[Dict[str, Any]]
|
| |
|
| |
|
| | class CleanerAgentResponse(BaseModel):
|
| | """Response model for cleaner assignment agent"""
|
| |
|
| | selected_cleaner_id: Optional[str]
|
| | reason: str
|
| |
|
| |
|
| | class NurseAgentRequest(BaseModel):
|
| | """Request model for nurse assignment agent"""
|
| |
|
| | patient: Dict[str, Any]
|
| | bed: Dict[str, Any]
|
| | available_nurses: List[Dict[str, Any]]
|
| |
|
| |
|
| | class NurseAgentResponse(BaseModel):
|
| | """Response model for nurse assignment agent"""
|
| |
|
| | selected_nurse_id: Optional[str]
|
| | reason: str
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | @app.on_event("startup")
|
| | async def startup_event():
|
| | """Initialize all AI agents on server startup"""
|
| | global \
|
| | firebase_service, \
|
| | gemini_service, \
|
| | memory_agent, \
|
| | bed_allocator_agent, \
|
| | communicator_agent
|
| |
|
| | try:
|
| | logger.info("=" * 60)
|
| | logger.info("CareFlow AI Agent Server Starting...")
|
| | logger.info("=" * 60)
|
| |
|
| |
|
| | logger.info("Validating configuration...")
|
| | if not config.validate():
|
| | raise Exception("Configuration validation failed")
|
| | logger.info("✓ Configuration valid")
|
| |
|
| |
|
| | logger.info("Initializing Firebase service...")
|
| | firebase_service = FirebaseService(
|
| | service_account_path=config.firebase.service_account_path
|
| | )
|
| | logger.info("✓ Firebase service initialized")
|
| |
|
| |
|
| | logger.info("Initializing Gemini AI service...")
|
| | gemini_service = GeminiService(
|
| | api_key=config.gemini.api_key,
|
| | model_name=config.gemini.model_name,
|
| | )
|
| | logger.info("✓ Gemini AI service initialized")
|
| |
|
| |
|
| | logger.info("Initializing Memory Agent...")
|
| | memory_agent = MemoryAgent(
|
| | firebase_service=firebase_service,
|
| | gemini_service=gemini_service,
|
| | refresh_interval=config.agent.state_refresh_interval,
|
| | )
|
| | await memory_agent.initialize()
|
| | logger.info("✓ Memory Agent initialized")
|
| |
|
| |
|
| | logger.info("Initializing Bed Allocator Agent...")
|
| | bed_allocator_agent = BedAllocatorAgent(
|
| | firebase_service=firebase_service,
|
| | gemini_service=gemini_service,
|
| | memory_agent=memory_agent,
|
| | rule_weight=config.agent.rule_weight,
|
| | )
|
| | logger.info("✓ Bed Allocator Agent initialized")
|
| |
|
| |
|
| | logger.info("Initializing Communicator Agent...")
|
| | communicator_agent = CommunicatorAgent(
|
| | firebase_service=firebase_service,
|
| | gemini_service=gemini_service,
|
| | memory_agent=memory_agent,
|
| | max_staff_workload=config.agent.max_staff_workload,
|
| | )
|
| | logger.info("✓ Communicator Agent initialized")
|
| |
|
| | logger.info("=" * 60)
|
| | logger.info("✓ ALL AGENTS READY")
|
| | logger.info("=" * 60)
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Failed to initialize agents: {e}")
|
| | raise
|
| |
|
| |
|
| | @app.on_event("shutdown")
|
| | async def shutdown_event():
|
| | """Cleanup on server shutdown"""
|
| | logger.info("Shutting down AI Agent Server...")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def _extract_basic_requirements(doctor_input: Dict[str, Any]) -> Dict[str, Any]:
|
| | """Extract basic requirements from doctor input"""
|
| | diagnosis = doctor_input.get("diagnosis", "").lower()
|
| | special_instructions = doctor_input.get("special_instructions", "").lower()
|
| |
|
| | combined_text = f"{diagnosis} {special_instructions}"
|
| |
|
| | requirements = {
|
| | "needs_oxygen": any(
|
| | word in combined_text for word in ["oxygen", "o2", "respiratory"]
|
| | ),
|
| | "needs_ventilator": any(
|
| | word in combined_text for word in ["ventilator", "intubation"]
|
| | ),
|
| | "needs_cardiac_monitor": any(
|
| | word in combined_text for word in ["cardiac", "heart", "monitor"]
|
| | ),
|
| | "needs_isolation": any(
|
| | word in combined_text for word in ["isolation", "infectious", "contagious"]
|
| | ),
|
| | }
|
| |
|
| | return requirements
|
| |
|
| |
|
| | def _infer_severity(diagnosis: str) -> str:
|
| | """Infer severity from diagnosis text"""
|
| | diagnosis_lower = diagnosis.lower()
|
| |
|
| | if any(
|
| | word in diagnosis_lower for word in ["critical", "severe", "emergency", "acute"]
|
| | ):
|
| | return "critical"
|
| | elif any(word in diagnosis_lower for word in ["moderate", "significant"]):
|
| | return "high"
|
| | elif any(word in diagnosis_lower for word in ["mild", "minor"]):
|
| | return "low"
|
| | else:
|
| | return "moderate"
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | @app.get("/health")
|
| | async def health_check():
|
| | """Health check endpoint"""
|
| | agents_ready = all(
|
| | [
|
| | firebase_service is not None,
|
| | gemini_service is not None,
|
| | memory_agent is not None,
|
| | bed_allocator_agent is not None,
|
| | communicator_agent is not None,
|
| | ]
|
| | )
|
| |
|
| | return {
|
| | "status": "healthy" if agents_ready else "initializing",
|
| | "agents_ready": agents_ready,
|
| | "timestamp": datetime.now().isoformat(),
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | @app.post("/agent/bed-assignment", response_model=BedAgentResponse)
|
| | async def call_bed_agent(request: BedAgentRequest):
|
| | """
|
| | Bed Assignment Agent Endpoint
|
| |
|
| | Returns bed recommendations based on patient diagnosis and available beds.
|
| | Uses hybrid AI + rule-based scoring.
|
| |
|
| | Input:
|
| | {
|
| | "patient": {...},
|
| | "doctor_input": {
|
| | "diagnosis": "Pneumonia",
|
| | "special_instructions": "Oxygen support"
|
| | },
|
| | "available_beds": [...]
|
| | }
|
| |
|
| | Output:
|
| | {
|
| | "recommended_bed_id": "bed22",
|
| | "reason": "Supports oxygen",
|
| | "recommendations": [top 3 beds with scores],
|
| | "confidence": 85
|
| | }
|
| | """
|
| | try:
|
| | if not bed_allocator_agent:
|
| | raise HTTPException(status_code=503, detail="Agents not initialized")
|
| |
|
| | patient = request.patient
|
| | doctor_input = request.doctor_input
|
| | patient_id = patient.get("patient_id")
|
| |
|
| | logger.info(f"Bed assignment request for patient: {patient_id}")
|
| |
|
| |
|
| | await firebase_service.update_patient(
|
| | patient_id,
|
| | {
|
| | "diagnosis": doctor_input.get("diagnosis"),
|
| | "severity": _infer_severity(doctor_input.get("diagnosis", "")),
|
| | "requirements": _extract_basic_requirements(doctor_input),
|
| | },
|
| | )
|
| |
|
| |
|
| | result = await bed_allocator_agent.process({"patient_id": patient_id})
|
| |
|
| | if not result.get("success"):
|
| | logger.warning(f"No suitable beds found for patient {patient_id}")
|
| | return BedAgentResponse(
|
| | recommended_bed_id=None,
|
| | reason=result.get("message", "No suitable beds found"),
|
| | recommendations=[],
|
| | confidence=0,
|
| | )
|
| |
|
| | data = result.get("data", {})
|
| | recommendations = data.get("recommendations", [])
|
| |
|
| |
|
| | response = BedAgentResponse(
|
| | recommended_bed_id=recommendations[0].get("bed_id")
|
| | if recommendations
|
| | else None,
|
| | reason=recommendations[0].get("reasoning")
|
| | if recommendations
|
| | else "No beds available",
|
| | recommendations=[
|
| | {
|
| | "bed_id": rec.get("bed_id"),
|
| | "bed_number": rec.get("bed_number"),
|
| | "ward": rec.get("ward"),
|
| | "score": rec.get("score"),
|
| | "reasoning": rec.get("reasoning"),
|
| | "pros": rec.get("pros", []),
|
| | "cons": rec.get("cons", []),
|
| | }
|
| | for rec in recommendations[:3]
|
| | ],
|
| | confidence=data.get("confidence", 0),
|
| | )
|
| |
|
| | logger.info(f"Bed assignment complete: {response.recommended_bed_id}")
|
| | return response
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error in bed agent: {e}")
|
| | raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
|
| |
|
| |
|
| | @app.post("/agent/cleaner-assignment", response_model=CleanerAgentResponse)
|
| | async def call_cleaner_agent(request: CleanerAgentRequest):
|
| | """
|
| | Cleaner Assignment Agent Endpoint
|
| |
|
| | Assigns a cleaner to prepare a bed.
|
| |
|
| | Input:
|
| | {
|
| | "bed_id": "bed22",
|
| | "available_cleaners": [...]
|
| | }
|
| |
|
| | Output:
|
| | {
|
| | "selected_cleaner_id": "c1",
|
| | "reason": "Least workload"
|
| | }
|
| | """
|
| | try:
|
| | if not communicator_agent:
|
| | raise HTTPException(status_code=503, detail="Agents not initialized")
|
| |
|
| | bed_id = request.bed_id
|
| | available_cleaners = request.available_cleaners
|
| |
|
| | logger.info(f"Cleaner assignment request for bed: {bed_id}")
|
| |
|
| |
|
| | bed = await firebase_service.get_bed(bed_id)
|
| | if not bed:
|
| | raise HTTPException(status_code=404, detail="Bed not found")
|
| |
|
| |
|
| | task_data = {
|
| | "task_type": "cleaning",
|
| | "description": f"Clean bed {bed.get('bed_id')} in {bed.get('ward')}",
|
| | "bed_id": bed_id,
|
| | "priority": "high",
|
| | "role": "cleaner",
|
| | }
|
| |
|
| | result = await communicator_agent.process(
|
| | {"type": "assign_staff", "task_data": task_data}
|
| | )
|
| |
|
| | if result.get("success"):
|
| | assignment = result.get("data", {})
|
| | response = CleanerAgentResponse(
|
| | selected_cleaner_id=assignment.get("staff_id"),
|
| | reason=assignment.get(
|
| | "reasoning", "Selected based on workload and availability"
|
| | ),
|
| | )
|
| | else:
|
| |
|
| | if available_cleaners:
|
| | response = CleanerAgentResponse(
|
| | selected_cleaner_id=available_cleaners[0].get("user_id"),
|
| | reason="First available cleaner",
|
| | )
|
| | else:
|
| | response = CleanerAgentResponse(
|
| | selected_cleaner_id=None,
|
| | reason="No cleaners available",
|
| | )
|
| |
|
| | logger.info(f"Cleaner assignment complete: {response.selected_cleaner_id}")
|
| | return response
|
| |
|
| | except HTTPException:
|
| | raise
|
| | except Exception as e:
|
| | logger.error(f"Error in cleaner agent: {e}")
|
| |
|
| | if request.available_cleaners:
|
| | return CleanerAgentResponse(
|
| | selected_cleaner_id=request.available_cleaners[0].get("user_id"),
|
| | reason=f"Fallback assignment due to error",
|
| | )
|
| | raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
|
| |
|
| |
|
| | @app.post("/agent/nurse-assignment", response_model=NurseAgentResponse)
|
| | async def call_nurse_agent(request: NurseAgentRequest):
|
| | """
|
| | Nurse Assignment Agent Endpoint
|
| |
|
| | Assigns a nurse to prepare a bed for a patient.
|
| |
|
| | Input:
|
| | {
|
| | "patient": {...},
|
| | "bed": {...},
|
| | "available_nurses": [...]
|
| | }
|
| |
|
| | Output:
|
| | {
|
| | "selected_nurse_id": "n1",
|
| | "reason": "ICU trained"
|
| | }
|
| | """
|
| | try:
|
| | if not communicator_agent:
|
| | raise HTTPException(status_code=503, detail="Agents not initialized")
|
| |
|
| | patient = request.patient
|
| | bed = request.bed
|
| | available_nurses = request.available_nurses
|
| |
|
| | logger.info(
|
| | f"Nurse assignment request for patient: {patient.get('patient_id')}"
|
| | )
|
| |
|
| |
|
| | task_data = {
|
| | "task_type": "nursing",
|
| | "description": f"Prepare bed {bed.get('bed_id')} for patient {patient.get('name')}",
|
| | "bed_id": bed.get("bed_id"),
|
| | "patient_id": patient.get("patient_id"),
|
| | "priority": "high",
|
| | "role": "nurse",
|
| | }
|
| |
|
| | result = await communicator_agent.process(
|
| | {"type": "assign_staff", "task_data": task_data}
|
| | )
|
| |
|
| | if result.get("success"):
|
| | assignment = result.get("data", {})
|
| | response = NurseAgentResponse(
|
| | selected_nurse_id=assignment.get("staff_id"),
|
| | reason=assignment.get(
|
| | "reasoning", "Selected based on workload and specialization"
|
| | ),
|
| | )
|
| | else:
|
| |
|
| | if available_nurses:
|
| | response = NurseAgentResponse(
|
| | selected_nurse_id=available_nurses[0].get("user_id"),
|
| | reason="First available nurse",
|
| | )
|
| | else:
|
| | response = NurseAgentResponse(
|
| | selected_nurse_id=None,
|
| | reason="No nurses available",
|
| | )
|
| |
|
| | logger.info(f"Nurse assignment complete: {response.selected_nurse_id}")
|
| | return response
|
| |
|
| | except HTTPException:
|
| | raise
|
| | except Exception as e:
|
| | logger.error(f"Error in nurse agent: {e}")
|
| |
|
| | if request.available_nurses:
|
| | return NurseAgentResponse(
|
| | selected_nurse_id=request.available_nurses[0].get("user_id"),
|
| | reason=f"Fallback assignment",
|
| | )
|
| | raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| | import os
|
| |
|
| | import uvicorn
|
| |
|
| | port = int(os.getenv("AGENT_SERVER_PORT", "9000"))
|
| |
|
| | logger.info(f"Starting Agent Server on port {port}...")
|
| | uvicorn.run(app, host="0.0.0.0", port=port)
|
| |
|