Spaces:
Sleeping
Sleeping
| # src/data/emr_update.py | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| from src.data.connection import get_database | |
| from src.models.emr import ExtractedData | |
| from src.utils.logger import logger | |
| class EMRUpdateService: | |
| """Service for updating EMR records with document analysis results.""" | |
| def __init__(self): | |
| self.db = get_database() | |
| async def save_document_analysis( | |
| self, | |
| patient_id: str, | |
| filename: str, | |
| file_content: bytes, | |
| extracted_data: ExtractedData, | |
| confidence_score: float, | |
| original_message: str = None | |
| ) -> str: | |
| """ | |
| Save document analysis results to the EMR database. | |
| Args: | |
| patient_id: The ID of the patient | |
| filename: The name of the uploaded file | |
| file_content: The binary content of the file | |
| extracted_data: The extracted medical data | |
| confidence_score: The confidence score of the analysis | |
| original_message: Optional original message (for chat-based entries) | |
| Returns: | |
| The EMR ID of the created record | |
| """ | |
| try: | |
| # Generate unique EMR ID | |
| emr_id = str(uuid.uuid4()) | |
| # Prepare the EMR record | |
| emr_record = { | |
| "emr_id": emr_id, | |
| "patient_id": patient_id, | |
| "original_message": original_message or f"Document upload: {filename}", | |
| "extracted_data": { | |
| "diagnosis": extracted_data.diagnosis or [], | |
| "symptoms": extracted_data.symptoms or [], | |
| "medications": [ | |
| { | |
| "name": med.name, | |
| "dosage": med.dosage, | |
| "frequency": med.frequency, | |
| "duration": med.duration | |
| } | |
| for med in extracted_data.medications or [] | |
| ], | |
| "vital_signs": { | |
| "blood_pressure": extracted_data.vital_signs.blood_pressure if extracted_data.vital_signs else None, | |
| "heart_rate": extracted_data.vital_signs.heart_rate if extracted_data.vital_signs else None, | |
| "temperature": extracted_data.vital_signs.temperature if extracted_data.vital_signs else None, | |
| "respiratory_rate": extracted_data.vital_signs.respiratory_rate if extracted_data.vital_signs else None, | |
| "oxygen_saturation": extracted_data.vital_signs.oxygen_saturation if extracted_data.vital_signs else None | |
| } if extracted_data.vital_signs else None, | |
| "lab_results": [ | |
| { | |
| "test_name": lab.test_name, | |
| "value": lab.value, | |
| "unit": lab.unit, | |
| "reference_range": lab.reference_range | |
| } | |
| for lab in extracted_data.lab_results or [] | |
| ], | |
| "procedures": extracted_data.procedures or [], | |
| "notes": extracted_data.notes or "" | |
| }, | |
| "confidence_score": confidence_score, | |
| "source": "document_upload", | |
| "filename": filename, | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| # Save to database | |
| result = await self.db.emr_records.insert_one(emr_record) | |
| if result.inserted_id: | |
| logger().info(f"Successfully saved document analysis for patient {patient_id}, EMR ID: {emr_id}") | |
| return emr_id | |
| else: | |
| raise Exception("Failed to insert EMR record") | |
| except Exception as e: | |
| logger().error(f"Error saving document analysis: {e}") | |
| raise | |
| async def update_emr_record( | |
| self, | |
| emr_id: str, | |
| extracted_data: ExtractedData, | |
| confidence_score: float = None | |
| ) -> bool: | |
| """ | |
| Update an existing EMR record with new extracted data. | |
| Args: | |
| emr_id: The EMR record ID to update | |
| extracted_data: The updated extracted medical data | |
| confidence_score: Optional new confidence score | |
| Returns: | |
| True if update was successful, False otherwise | |
| """ | |
| try: | |
| # Prepare update data | |
| update_data = { | |
| "extracted_data": { | |
| "diagnosis": extracted_data.diagnosis or [], | |
| "symptoms": extracted_data.symptoms or [], | |
| "medications": [ | |
| { | |
| "name": med.name, | |
| "dosage": med.dosage, | |
| "frequency": med.frequency, | |
| "duration": med.duration | |
| } | |
| for med in extracted_data.medications or [] | |
| ], | |
| "vital_signs": { | |
| "blood_pressure": extracted_data.vital_signs.blood_pressure if extracted_data.vital_signs else None, | |
| "heart_rate": extracted_data.vital_signs.heart_rate if extracted_data.vital_signs else None, | |
| "temperature": extracted_data.vital_signs.temperature if extracted_data.vital_signs else None, | |
| "respiratory_rate": extracted_data.vital_signs.respiratory_rate if extracted_data.vital_signs else None, | |
| "oxygen_saturation": extracted_data.vital_signs.oxygen_saturation if extracted_data.vital_signs else None | |
| } if extracted_data.vital_signs else None, | |
| "lab_results": [ | |
| { | |
| "test_name": lab.test_name, | |
| "value": lab.value, | |
| "unit": lab.unit, | |
| "reference_range": lab.reference_range | |
| } | |
| for lab in extracted_data.lab_results or [] | |
| ], | |
| "procedures": extracted_data.procedures or [], | |
| "notes": extracted_data.notes or "" | |
| }, | |
| "updated_at": datetime.utcnow() | |
| } | |
| if confidence_score is not None: | |
| update_data["confidence_score"] = confidence_score | |
| # Update the record | |
| result = await self.db.emr_records.update_one( | |
| {"emr_id": emr_id}, | |
| {"$set": update_data} | |
| ) | |
| if result.modified_count > 0: | |
| logger().info(f"Successfully updated EMR record {emr_id}") | |
| return True | |
| else: | |
| logger().warning(f"No EMR record found with ID {emr_id}") | |
| return False | |
| except Exception as e: | |
| logger().error(f"Error updating EMR record {emr_id}: {e}") | |
| raise | |
| async def get_emr_record(self, emr_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve an EMR record by ID. | |
| Args: | |
| emr_id: The EMR record ID | |
| Returns: | |
| The EMR record if found, None otherwise | |
| """ | |
| try: | |
| record = await self.db.emr_records.find_one({"emr_id": emr_id}) | |
| if record: | |
| # Convert ObjectId to string for JSON serialization | |
| record["_id"] = str(record["_id"]) | |
| return record | |
| except Exception as e: | |
| logger().error(f"Error retrieving EMR record {emr_id}: {e}") | |
| raise | |
| async def delete_emr_record(self, emr_id: str) -> bool: | |
| """ | |
| Delete an EMR record. | |
| Args: | |
| emr_id: The EMR record ID to delete | |
| Returns: | |
| True if deletion was successful, False otherwise | |
| """ | |
| try: | |
| result = await self.db.emr_records.delete_one({"emr_id": emr_id}) | |
| if result.deleted_count > 0: | |
| logger().info(f"Successfully deleted EMR record {emr_id}") | |
| return True | |
| else: | |
| logger().warning(f"No EMR record found with ID {emr_id}") | |
| return False | |
| except Exception as e: | |
| logger().error(f"Error deleting EMR record {emr_id}: {e}") | |
| raise | |
| async def get_patient_emr_records( | |
| self, | |
| patient_id: str, | |
| limit: int = 100, | |
| skip: int = 0 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve EMR records for a specific patient. | |
| Args: | |
| patient_id: The patient ID | |
| limit: Maximum number of records to return | |
| skip: Number of records to skip | |
| Returns: | |
| List of EMR records | |
| """ | |
| try: | |
| cursor = self.db.emr_records.find( | |
| {"patient_id": patient_id} | |
| ).sort("created_at", -1).skip(skip).limit(limit) | |
| records = [] | |
| async for record in cursor: | |
| # Convert ObjectId to string for JSON serialization | |
| record["_id"] = str(record["_id"]) | |
| records.append(record) | |
| return records | |
| except Exception as e: | |
| logger().error(f"Error retrieving EMR records for patient {patient_id}: {e}") | |
| raise | |
| async def get_patient_emr_statistics(self, patient_id: str) -> Dict[str, Any]: | |
| """ | |
| Get EMR statistics for a patient. | |
| Args: | |
| patient_id: The patient ID | |
| Returns: | |
| Dictionary containing EMR statistics | |
| """ | |
| try: | |
| pipeline = [ | |
| {"$match": {"patient_id": patient_id}}, | |
| { | |
| "$group": { | |
| "_id": None, | |
| "total_entries": {"$sum": 1}, | |
| "avg_confidence": {"$avg": "$confidence_score"}, | |
| "diagnosis_count": { | |
| "$sum": { | |
| "$cond": [ | |
| {"$gt": [{"$size": {"$ifNull": ["$extracted_data.diagnosis", []]}}, 0]}, | |
| 1, | |
| 0 | |
| ] | |
| } | |
| }, | |
| "medication_count": { | |
| "$sum": { | |
| "$cond": [ | |
| {"$gt": [{"$size": {"$ifNull": ["$extracted_data.medications", []]}}, 0]}, | |
| 1, | |
| 0 | |
| ] | |
| } | |
| } | |
| } | |
| } | |
| ] | |
| result = await self.db.emr_records.aggregate(pipeline).to_list(1) | |
| if result: | |
| stats = result[0] | |
| return { | |
| "total_entries": stats.get("total_entries", 0), | |
| "avg_confidence": stats.get("avg_confidence", 0.0), | |
| "diagnosis_count": stats.get("diagnosis_count", 0), | |
| "medication_count": stats.get("medication_count", 0) | |
| } | |
| else: | |
| return { | |
| "total_entries": 0, | |
| "avg_confidence": 0.0, | |
| "diagnosis_count": 0, | |
| "medication_count": 0 | |
| } | |
| except Exception as e: | |
| logger().error(f"Error retrieving EMR statistics for patient {patient_id}: {e}") | |
| raise | |