LiamKhoaLe's picture
Upd EMR parser and updater services (OCR+VLM)
55b641e
# src/data/emr_update.py
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from src.data.connection import get_database
from src.models.emr import ExtractedData
from src.utils.logger import logger
class EMRUpdateService:
"""Service for updating EMR records with document analysis results."""
def __init__(self):
self.db = get_database()
async def save_document_analysis(
self,
patient_id: str,
filename: str,
file_content: bytes,
extracted_data: ExtractedData,
confidence_score: float,
original_message: str = None
) -> str:
"""
Save document analysis results to the EMR database.
Args:
patient_id: The ID of the patient
filename: The name of the uploaded file
file_content: The binary content of the file
extracted_data: The extracted medical data
confidence_score: The confidence score of the analysis
original_message: Optional original message (for chat-based entries)
Returns:
The EMR ID of the created record
"""
try:
# Generate unique EMR ID
emr_id = str(uuid.uuid4())
# Prepare the EMR record
emr_record = {
"emr_id": emr_id,
"patient_id": patient_id,
"original_message": original_message or f"Document upload: {filename}",
"extracted_data": {
"diagnosis": extracted_data.diagnosis or [],
"symptoms": extracted_data.symptoms or [],
"medications": [
{
"name": med.name,
"dosage": med.dosage,
"frequency": med.frequency,
"duration": med.duration
}
for med in extracted_data.medications or []
],
"vital_signs": {
"blood_pressure": extracted_data.vital_signs.blood_pressure if extracted_data.vital_signs else None,
"heart_rate": extracted_data.vital_signs.heart_rate if extracted_data.vital_signs else None,
"temperature": extracted_data.vital_signs.temperature if extracted_data.vital_signs else None,
"respiratory_rate": extracted_data.vital_signs.respiratory_rate if extracted_data.vital_signs else None,
"oxygen_saturation": extracted_data.vital_signs.oxygen_saturation if extracted_data.vital_signs else None
} if extracted_data.vital_signs else None,
"lab_results": [
{
"test_name": lab.test_name,
"value": lab.value,
"unit": lab.unit,
"reference_range": lab.reference_range
}
for lab in extracted_data.lab_results or []
],
"procedures": extracted_data.procedures or [],
"notes": extracted_data.notes or ""
},
"confidence_score": confidence_score,
"source": "document_upload",
"filename": filename,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
# Save to database
result = await self.db.emr_records.insert_one(emr_record)
if result.inserted_id:
logger().info(f"Successfully saved document analysis for patient {patient_id}, EMR ID: {emr_id}")
return emr_id
else:
raise Exception("Failed to insert EMR record")
except Exception as e:
logger().error(f"Error saving document analysis: {e}")
raise
async def update_emr_record(
self,
emr_id: str,
extracted_data: ExtractedData,
confidence_score: float = None
) -> bool:
"""
Update an existing EMR record with new extracted data.
Args:
emr_id: The EMR record ID to update
extracted_data: The updated extracted medical data
confidence_score: Optional new confidence score
Returns:
True if update was successful, False otherwise
"""
try:
# Prepare update data
update_data = {
"extracted_data": {
"diagnosis": extracted_data.diagnosis or [],
"symptoms": extracted_data.symptoms or [],
"medications": [
{
"name": med.name,
"dosage": med.dosage,
"frequency": med.frequency,
"duration": med.duration
}
for med in extracted_data.medications or []
],
"vital_signs": {
"blood_pressure": extracted_data.vital_signs.blood_pressure if extracted_data.vital_signs else None,
"heart_rate": extracted_data.vital_signs.heart_rate if extracted_data.vital_signs else None,
"temperature": extracted_data.vital_signs.temperature if extracted_data.vital_signs else None,
"respiratory_rate": extracted_data.vital_signs.respiratory_rate if extracted_data.vital_signs else None,
"oxygen_saturation": extracted_data.vital_signs.oxygen_saturation if extracted_data.vital_signs else None
} if extracted_data.vital_signs else None,
"lab_results": [
{
"test_name": lab.test_name,
"value": lab.value,
"unit": lab.unit,
"reference_range": lab.reference_range
}
for lab in extracted_data.lab_results or []
],
"procedures": extracted_data.procedures or [],
"notes": extracted_data.notes or ""
},
"updated_at": datetime.utcnow()
}
if confidence_score is not None:
update_data["confidence_score"] = confidence_score
# Update the record
result = await self.db.emr_records.update_one(
{"emr_id": emr_id},
{"$set": update_data}
)
if result.modified_count > 0:
logger().info(f"Successfully updated EMR record {emr_id}")
return True
else:
logger().warning(f"No EMR record found with ID {emr_id}")
return False
except Exception as e:
logger().error(f"Error updating EMR record {emr_id}: {e}")
raise
async def get_emr_record(self, emr_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve an EMR record by ID.
Args:
emr_id: The EMR record ID
Returns:
The EMR record if found, None otherwise
"""
try:
record = await self.db.emr_records.find_one({"emr_id": emr_id})
if record:
# Convert ObjectId to string for JSON serialization
record["_id"] = str(record["_id"])
return record
except Exception as e:
logger().error(f"Error retrieving EMR record {emr_id}: {e}")
raise
async def delete_emr_record(self, emr_id: str) -> bool:
"""
Delete an EMR record.
Args:
emr_id: The EMR record ID to delete
Returns:
True if deletion was successful, False otherwise
"""
try:
result = await self.db.emr_records.delete_one({"emr_id": emr_id})
if result.deleted_count > 0:
logger().info(f"Successfully deleted EMR record {emr_id}")
return True
else:
logger().warning(f"No EMR record found with ID {emr_id}")
return False
except Exception as e:
logger().error(f"Error deleting EMR record {emr_id}: {e}")
raise
async def get_patient_emr_records(
self,
patient_id: str,
limit: int = 100,
skip: int = 0
) -> List[Dict[str, Any]]:
"""
Retrieve EMR records for a specific patient.
Args:
patient_id: The patient ID
limit: Maximum number of records to return
skip: Number of records to skip
Returns:
List of EMR records
"""
try:
cursor = self.db.emr_records.find(
{"patient_id": patient_id}
).sort("created_at", -1).skip(skip).limit(limit)
records = []
async for record in cursor:
# Convert ObjectId to string for JSON serialization
record["_id"] = str(record["_id"])
records.append(record)
return records
except Exception as e:
logger().error(f"Error retrieving EMR records for patient {patient_id}: {e}")
raise
async def get_patient_emr_statistics(self, patient_id: str) -> Dict[str, Any]:
"""
Get EMR statistics for a patient.
Args:
patient_id: The patient ID
Returns:
Dictionary containing EMR statistics
"""
try:
pipeline = [
{"$match": {"patient_id": patient_id}},
{
"$group": {
"_id": None,
"total_entries": {"$sum": 1},
"avg_confidence": {"$avg": "$confidence_score"},
"diagnosis_count": {
"$sum": {
"$cond": [
{"$gt": [{"$size": {"$ifNull": ["$extracted_data.diagnosis", []]}}, 0]},
1,
0
]
}
},
"medication_count": {
"$sum": {
"$cond": [
{"$gt": [{"$size": {"$ifNull": ["$extracted_data.medications", []]}}, 0]},
1,
0
]
}
}
}
}
]
result = await self.db.emr_records.aggregate(pipeline).to_list(1)
if result:
stats = result[0]
return {
"total_entries": stats.get("total_entries", 0),
"avg_confidence": stats.get("avg_confidence", 0.0),
"diagnosis_count": stats.get("diagnosis_count", 0),
"medication_count": stats.get("medication_count", 0)
}
else:
return {
"total_entries": 0,
"avg_confidence": 0.0,
"diagnosis_count": 0,
"medication_count": 0
}
except Exception as e:
logger().error(f"Error retrieving EMR statistics for patient {patient_id}: {e}")
raise