fhirflame / src /workflow_orchestrator.py
leksval
initial commit
a963d65
"""
FhirFlame Workflow Orchestrator
Model-agnostic orchestrator that respects user preferences for OCR and LLM models
"""
import asyncio
import time
import os
from typing import Dict, Any, Optional, Union
from .file_processor import local_processor
from .codellama_processor import CodeLlamaProcessor
from .monitoring import monitor
class WorkflowOrchestrator:
"""Model-agnostic workflow orchestrator for medical document processing"""
def __init__(self):
self.local_processor = local_processor
self.codellama_processor = CodeLlamaProcessor()
self.mistral_api_key = os.getenv("MISTRAL_API_KEY")
# Available models configuration
self.available_models = {
"codellama": {
"processor": self.codellama_processor,
"name": "CodeLlama 13B-Instruct",
"available": True
},
"huggingface": {
"processor": self.codellama_processor, # Will be enhanced processor in app.py
"name": "HuggingFace API",
"available": True
},
"nlp_basic": {
"processor": self.codellama_processor, # Basic fallback
"name": "NLP Basic Processing",
"available": True
}
# Future models can be added here
}
self.available_ocr_methods = {
"mistral": {
"name": "Mistral OCR API",
"available": bool(self.mistral_api_key),
"requires_api": True
},
"local": {
"name": "Local OCR Processor",
"available": True,
"requires_api": False
}
}
@monitor.track_operation("complete_document_workflow")
async def process_complete_workflow(
self,
document_bytes: Optional[bytes] = None,
medical_text: Optional[str] = None,
user_id: str = "workflow-user",
filename: str = "medical_document",
document_type: str = "clinical_note",
use_mistral_ocr: bool = None,
use_advanced_llm: bool = True,
llm_model: str = "codellama",
generate_fhir: bool = True
) -> Dict[str, Any]:
"""
Complete workflow: Document β†’ OCR β†’ Entity Extraction β†’ FHIR Generation
Args:
document_bytes: Document content as bytes
medical_text: Direct text input (alternative to document_bytes)
user_id: User identifier for tracking
filename: Original filename for metadata
document_type: Type of medical document
use_mistral_ocr: Whether to use Mistral OCR API vs local OCR
use_advanced_llm: Whether to use advanced LLM processing
llm_model: Which LLM model to use (currently supports 'codellama')
generate_fhir: Whether to generate FHIR bundles
"""
workflow_start = time.time()
extracted_text = None
ocr_method_used = None
llm_processing_result = None
# Stage 1: Text Extraction
if document_bytes:
ocr_start_time = time.time()
# Auto-select Mistral if available and not explicitly disabled
if use_mistral_ocr is None:
use_mistral_ocr = bool(self.mistral_api_key)
# Choose OCR method based on user preference and availability
if use_mistral_ocr and self.mistral_api_key:
monitor.log_event("workflow_stage_start", {
"stage": "mistral_ocr_extraction",
"document_size": len(document_bytes),
"filename": filename
})
# Use Mistral OCR for text extraction
extracted_text = await self.local_processor._extract_with_mistral(document_bytes)
ocr_processing_time = time.time() - ocr_start_time
ocr_method_used = "mistral_api"
# Log Mistral OCR processing
monitor.log_mistral_ocr_processing(
document_size=len(document_bytes),
extraction_time=ocr_processing_time,
success=True,
text_length=len(extracted_text)
)
else:
# Use local processor
result = await self.local_processor.process_document(
document_bytes, user_id, filename
)
extracted_text = result.get('extracted_text', '')
ocr_method_used = "local_processor"
elif medical_text:
# Direct text input
extracted_text = medical_text
ocr_method_used = "direct_input"
else:
raise ValueError("Either document_bytes or medical_text must be provided")
# Stage 2: Medical Entity Extraction
if use_advanced_llm and llm_model in self.available_models:
model_config = self.available_models[llm_model]
if model_config["available"]:
monitor.log_event("workflow_stage_start", {
"stage": "llm_entity_extraction",
"model": llm_model,
"text_length": len(extracted_text),
"ocr_method": ocr_method_used
})
# Prepare source metadata
source_metadata = {
"extraction_method": ocr_method_used,
"original_filename": filename,
"document_size": len(document_bytes) if document_bytes else None,
"workflow_stage": "post_ocr_extraction" if document_bytes else "direct_text_input",
"llm_model": llm_model
}
# DEBUG: before entity extraction call
monitor.log_event("entity_extraction_pre_call", {
"provider": llm_model,
"text_snippet": extracted_text[:100]
})
llm_processing_result = await model_config["processor"].process_document(
medical_text=extracted_text,
document_type=document_type,
extract_entities=True,
generate_fhir=generate_fhir,
source_metadata=source_metadata
)
# DEBUG: after entity extraction call
monitor.log_event("entity_extraction_post_call", {
"provider": llm_model,
"extraction_results": llm_processing_result.get("extraction_results", {}),
"fhir_bundle_present": "fhir_bundle" in llm_processing_result
})
else:
# Model not available, use basic processing
llm_processing_result = {
"extracted_data": '{"error": "Advanced LLM not available"}',
"extraction_results": {
"entities_found": 0,
"quality_score": 0.0
},
"metadata": {
"model_used": "none",
"processing_time": 0.0
}
}
else:
# Basic text processing without advanced LLM
llm_processing_result = {
"extracted_data": f'{{"text_length": {len(extracted_text)}, "processing_mode": "basic"}}',
"extraction_results": {
"entities_found": 0,
"quality_score": 0.5
},
"metadata": {
"model_used": "basic_processor",
"processing_time": 0.1
}
}
# Stage 3: FHIR Validation (if FHIR bundle was generated)
fhir_validation_result = None
if generate_fhir and llm_processing_result.get('fhir_bundle'):
from .fhir_validator import FhirValidator
validator = FhirValidator()
monitor.log_event("workflow_stage_start", {
"stage": "fhir_validation",
"bundle_generated": True
})
fhir_validation_result = validator.validate_fhir_bundle(llm_processing_result['fhir_bundle'])
monitor.log_event("fhir_validation_complete", {
"is_valid": fhir_validation_result['is_valid'],
"compliance_score": fhir_validation_result['compliance_score'],
"validation_level": fhir_validation_result['validation_level']
})
# Stage 4: Workflow Results Assembly
workflow_time = time.time() - workflow_start
# Determine completed stages
stages_completed = ["text_extraction"]
if use_advanced_llm:
stages_completed.append("entity_extraction")
if generate_fhir:
stages_completed.append("fhir_generation")
if fhir_validation_result:
stages_completed.append("fhir_validation")
integrated_result = {
"workflow_metadata": {
"total_processing_time": workflow_time,
"mistral_ocr_used": ocr_method_used == "mistral_api",
"ocr_method": ocr_method_used,
"llm_model": llm_model if use_advanced_llm else "none",
"advanced_llm_used": use_advanced_llm,
"fhir_generated": generate_fhir,
"stages_completed": stages_completed,
"user_id": user_id,
"filename": filename,
"document_type": document_type
},
"text_extraction": {
"extracted_text": extracted_text[:500] + "..." if len(extracted_text) > 500 else extracted_text,
"full_text_length": len(extracted_text),
"extraction_method": ocr_method_used
},
"medical_analysis": {
"entities_found": llm_processing_result["extraction_results"]["entities_found"],
"quality_score": llm_processing_result["extraction_results"]["quality_score"],
"model_used": llm_processing_result["metadata"]["model_used"],
"extracted_data": llm_processing_result["extracted_data"]
},
"fhir_bundle": llm_processing_result.get("fhir_bundle") if generate_fhir else None,
"fhir_validation": fhir_validation_result,
"status": "success",
"processing_mode": "integrated_workflow"
}
# Log workflow completion
monitor.log_workflow_summary(
documents_processed=1,
successful_documents=1,
total_time=workflow_time,
average_time=workflow_time,
monitoring_active=monitor.langfuse is not None
)
# Log OCR workflow integration if OCR was used
if ocr_method_used in ["mistral_api", "local_processor"]:
monitor.log_ocr_workflow_integration(
ocr_method=ocr_method_used,
agent_processing_time=llm_processing_result["metadata"]["processing_time"],
total_workflow_time=workflow_time,
entities_found=llm_processing_result["extraction_results"]["entities_found"]
)
monitor.log_event("complete_workflow_success", {
"total_time": workflow_time,
"ocr_method": ocr_method_used,
"llm_model": llm_model if use_advanced_llm else "none",
"entities_found": llm_processing_result["extraction_results"]["entities_found"],
"fhir_generated": generate_fhir and "fhir_bundle" in llm_processing_result,
"processing_pipeline": f"{ocr_method_used} β†’ {llm_model if use_advanced_llm else 'basic'} β†’ {'fhir' if generate_fhir else 'no-fhir'}"
})
return integrated_result
def get_workflow_status(self) -> Dict[str, Any]:
"""Get current workflow configuration and available models"""
monitoring_status = monitor.get_monitoring_status()
return {
"available_ocr_methods": self.available_ocr_methods,
"available_llm_models": self.available_models,
"mistral_api_key_configured": bool(self.mistral_api_key),
"monitoring_enabled": monitoring_status["langfuse_enabled"],
"monitoring_status": monitoring_status,
"default_configuration": {
"ocr_method": "mistral" if self.mistral_api_key else "local",
"llm_model": "codellama",
"generate_fhir": True
}
}
def get_available_models(self) -> Dict[str, Any]:
"""Get list of available models for UI dropdowns"""
return {
"ocr_methods": [
{"value": "mistral", "label": "Mistral OCR API", "available": bool(self.mistral_api_key)},
{"value": "local", "label": "Local OCR Processor", "available": True}
],
"llm_models": [
{"value": "codellama", "label": "CodeLlama 13B-Instruct", "available": True},
{"value": "basic", "label": "Basic Text Processing", "available": True}
]
}
# Global workflow orchestrator instance
workflow_orchestrator = WorkflowOrchestrator()