careflow / utils /response_parser.py
omgy's picture
Upload 14 files
81a42c2 verified
"""
Response Parser Utility for CareFlow Nexus
Handles parsing and validation of Gemini AI responses
"""
import json
import logging
import re
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class ResponseParser:
"""Utility class for parsing and validating AI responses"""
@staticmethod
def extract_json(text: str) -> Optional[Dict[str, Any]]:
"""
Extract JSON from text response (handles various formats)
Args:
text: Text containing JSON
Returns:
Parsed JSON dictionary or None
"""
if not text:
return None
# Try direct JSON parse first
try:
return json.loads(text.strip())
except json.JSONDecodeError:
pass
# Try to find JSON in markdown code blocks
patterns = [
r"```json\s*(\{.*?\})\s*```", # ```json {...} ```
r"```\s*(\{.*?\})\s*```", # ``` {...} ```
r"```json\s*(\[.*?\])\s*```", # ```json [...] ```
r"```\s*(\[.*?\])\s*```", # ``` [...] ```
]
for pattern in patterns:
matches = re.findall(pattern, text, re.DOTALL)
if matches:
try:
return json.loads(matches[0])
except json.JSONDecodeError:
continue
# Try to find any JSON object or array in the text
json_object_pattern = r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}"
json_array_pattern = r"\[[^\[\]]*(?:\[[^\[\]]*\][^\[\]]*)*\]"
for pattern in [json_object_pattern, json_array_pattern]:
matches = re.findall(pattern, text, re.DOTALL)
for match in matches:
try:
parsed = json.loads(match)
# Verify it's a meaningful JSON (not just empty)
if parsed:
return parsed
except json.JSONDecodeError:
continue
logger.warning("Could not extract valid JSON from response")
return None
@staticmethod
def validate_required_fields(
data: Dict[str, Any], required_fields: List[str]
) -> tuple[bool, List[str]]:
"""
Validate that dictionary contains required fields
Args:
data: Dictionary to validate
required_fields: List of required field names
Returns:
Tuple of (is_valid, missing_fields)
"""
if not isinstance(data, dict):
return False, required_fields
missing = [field for field in required_fields if field not in data]
return len(missing) == 0, missing
@staticmethod
def sanitize_response(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Clean and normalize response data
Args:
data: Raw response data
Returns:
Sanitized dictionary
"""
if not isinstance(data, dict):
return {}
sanitized = {}
for key, value in data.items():
# Clean key (remove special chars, lowercase)
clean_key = key.strip().lower().replace(" ", "_")
# Clean value based on type
if isinstance(value, str):
sanitized[clean_key] = value.strip()
elif isinstance(value, dict):
sanitized[clean_key] = ResponseParser.sanitize_response(value)
elif isinstance(value, list):
sanitized[clean_key] = [
ResponseParser.sanitize_response(item)
if isinstance(item, dict)
else item
for item in value
]
else:
sanitized[clean_key] = value
return sanitized
@staticmethod
def validate_score(score: Any, min_val: int = 0, max_val: int = 100) -> int:
"""
Validate and normalize score to range
Args:
score: Score value (any type)
min_val: Minimum valid score
max_val: Maximum valid score
Returns:
Validated score within range
"""
try:
score_int = int(float(score))
return max(min_val, min(max_val, score_int))
except (ValueError, TypeError):
logger.warning(f"Invalid score value: {score}, returning 0")
return 0
@staticmethod
def parse_bed_allocation_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse and validate bed allocation response
Args:
response: Raw response from AI
Returns:
Validated and structured response
"""
try:
recommendations = response.get("recommendations", [])
if not isinstance(recommendations, list):
recommendations = []
parsed_recs = []
for rec in recommendations[:3]: # Top 3 only
if not isinstance(rec, dict):
continue
parsed_rec = {
"bed_id": rec.get("bed_id", ""),
"bed_number": rec.get("bed_number", ""),
"ward": rec.get("ward", ""),
"score": ResponseParser.validate_score(rec.get("score", 0)),
"reasoning": rec.get("reasoning", "No reasoning provided"),
"pros": rec.get("pros", [])
if isinstance(rec.get("pros"), list)
else [],
"cons": rec.get("cons", [])
if isinstance(rec.get("cons"), list)
else [],
}
parsed_recs.append(parsed_rec)
return {
"recommendations": parsed_recs,
"overall_confidence": ResponseParser.validate_score(
response.get("overall_confidence", 50)
),
"considerations": response.get("considerations", ""),
}
except Exception as e:
logger.error(f"Error parsing bed allocation response: {e}")
return {
"recommendations": [],
"overall_confidence": 0,
"considerations": "",
}
@staticmethod
def parse_requirement_extraction_response(
response: Dict[str, Any],
) -> Dict[str, Any]:
"""
Parse and validate requirement extraction response
Args:
response: Raw response from AI
Returns:
Validated requirements dictionary
"""
try:
return {
"needs_oxygen": bool(response.get("needs_oxygen", False)),
"needs_ventilator": bool(response.get("needs_ventilator", False)),
"needs_cardiac_monitor": bool(
response.get("needs_cardiac_monitor", False)
),
"needs_isolation": bool(response.get("needs_isolation", False)),
"preferred_ward": response.get("preferred_ward"),
"proximity_preference": ResponseParser.validate_score(
response.get("proximity_preference", 5), 1, 10
),
"special_considerations": response.get("special_considerations", [])
if isinstance(response.get("special_considerations"), list)
else [],
"confidence": ResponseParser.validate_score(
response.get("confidence", 50)
),
"reasoning": response.get("reasoning", ""),
}
except Exception as e:
logger.error(f"Error parsing requirement extraction response: {e}")
return {
"needs_oxygen": False,
"needs_ventilator": False,
"needs_cardiac_monitor": False,
"needs_isolation": False,
"preferred_ward": None,
"proximity_preference": 5,
"special_considerations": [],
"confidence": 0,
"reasoning": "Error parsing response",
}
@staticmethod
def parse_staff_assignment_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse and validate staff assignment response
Args:
response: Raw response from AI
Returns:
Validated assignment dictionary
"""
try:
alternatives = response.get("alternatives", [])
if not isinstance(alternatives, list):
alternatives = []
return {
"recommended_staff_id": response.get("recommended_staff_id", ""),
"staff_name": response.get("staff_name", ""),
"reasoning": response.get("reasoning", "No reasoning provided"),
"workload_impact": response.get("workload_impact", ""),
"concerns": response.get("concerns", [])
if isinstance(response.get("concerns"), list)
else [],
"alternatives": alternatives[:2], # Top 2 alternatives
"confidence": ResponseParser.validate_score(
response.get("confidence", 50)
),
}
except Exception as e:
logger.error(f"Error parsing staff assignment response: {e}")
return {
"recommended_staff_id": "",
"staff_name": "",
"reasoning": "Error parsing response",
"workload_impact": "",
"concerns": [],
"alternatives": [],
"confidence": 0,
}
@staticmethod
def parse_state_analysis_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse and validate state analysis response
Args:
response: Raw response from AI
Returns:
Validated analysis dictionary
"""
try:
return {
"critical_alerts": response.get("critical_alerts", [])
if isinstance(response.get("critical_alerts"), list)
else [],
"bottlenecks": response.get("bottlenecks", [])
if isinstance(response.get("bottlenecks"), list)
else [],
"capacity_forecast": response.get("capacity_forecast", {})
if isinstance(response.get("capacity_forecast"), dict)
else {},
"recommendations": response.get("recommendations", [])
if isinstance(response.get("recommendations"), list)
else [],
}
except Exception as e:
logger.error(f"Error parsing state analysis response: {e}")
return {
"critical_alerts": [],
"bottlenecks": [],
"capacity_forecast": {},
"recommendations": [],
}
@staticmethod
def combine_scores(
rule_score: float, ai_score: float, rule_weight: float = 0.5
) -> float:
"""
Combine rule-based and AI scores with weights
Args:
rule_score: Rule-based score (0-100)
ai_score: AI-generated score (0-100)
rule_weight: Weight for rule score (0-1), AI gets (1-rule_weight)
Returns:
Combined score
"""
ai_weight = 1.0 - rule_weight
combined = (rule_score * rule_weight) + (ai_score * ai_weight)
return round(combined, 2)
@staticmethod
def format_error_response(
error_message: str, error_type: str = "general"
) -> Dict[str, Any]:
"""
Format error into standard response structure
Args:
error_message: Error message
error_type: Type of error
Returns:
Error response dictionary
"""
return {
"success": False,
"error": True,
"error_type": error_type,
"message": error_message,
"data": None,
}
@staticmethod
def format_success_response(data: Any, message: str = "Success") -> Dict[str, Any]:
"""
Format success response
Args:
data: Response data
message: Success message
Returns:
Success response dictionary
"""
return {"success": True, "error": False, "message": message, "data": data}