Spaces:
Sleeping
Sleeping
| """Tool registry and execution logic for the Pulse physiology environment.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from functools import partial | |
| from typing import Any, Callable | |
| from pulse_physiology_env.models import PulsePhysiologyAction, ToolError, ToolResult | |
| from pulse_physiology_env.patient_state import PatientState | |
| from pulse_physiology_env.tool_catalog import ( | |
| canonicalize_tool_name, | |
| coerce_boolean_argument, | |
| coerce_numeric_argument, | |
| ) | |
| from .pulse_engine_adapter import PulseEngineAdapter | |
| def _unique_tool_names(*groups: list[str]) -> list[str]: | |
| ordered: list[str] = [] | |
| seen: set[str] = set() | |
| for group in groups: | |
| for name in group: | |
| if name not in seen: | |
| seen.add(name) | |
| ordered.append(name) | |
| return ordered | |
| SPEC_PRIORITY_TOOL_NAMES = [ | |
| "get_vitals", | |
| "get_hemodynamics", | |
| "get_blood_chemistry", | |
| "get_respiratory_state", | |
| "get_shock_assessment", | |
| "perform_needle_decompression", | |
| "administer_crystalloid_bolus", | |
| "start_norepinephrine_infusion", | |
| "administer_epinephrine_bolus", | |
| "perform_intubation", | |
| "run_triage_assessment", | |
| "detect_deterioration", | |
| ] | |
| CLINICAL_45_TOOL_NAMES = [ | |
| "apply_nasal_cannula", | |
| "apply_simple_mask", | |
| "apply_nonrebreather_mask", | |
| "apply_bag_valve_mask", | |
| "perform_intubation", | |
| "set_ventilator_tidal_volume", | |
| "set_ventilator_rate", | |
| "set_ventilator_fio2", | |
| "initiate_hemorrhage", | |
| "apply_tourniquet", | |
| "apply_wound_packing", | |
| "apply_direct_pressure", | |
| "perform_needle_decompression", | |
| "perform_pericardiocentesis", | |
| "perform_cpr", | |
| "induce_cardiac_arrest", | |
| "apply_pericardial_effusion", | |
| "administer_epinephrine_bolus", | |
| "administer_morphine_bolus", | |
| "administer_ketamine_bolus", | |
| "administer_midazolam_bolus", | |
| "administer_lorazepam_bolus", | |
| "administer_succinylcholine_bolus", | |
| "administer_phenylephrine_bolus", | |
| "administer_atropine_bolus", | |
| "start_norepinephrine_infusion", | |
| "start_dopamine_infusion", | |
| "start_phenylephrine_infusion", | |
| "adjust_infusion_rate", | |
| "stop_infusion", | |
| "administer_crystalloid_bolus", | |
| "administer_blood_transfusion", | |
| "administer_plasma", | |
| "activate_massive_transfusion_protocol", | |
| "auscultate_chest", | |
| "assess_consciousness_level", | |
| "check_pain_level", | |
| "measure_core_temperature", | |
| "check_end_tidal_co2", | |
| "calculate_shock_index", | |
| "order_arterial_blood_gas", | |
| "order_complete_blood_count", | |
| "order_basic_metabolic_panel", | |
| "order_point_of_care_ultrasound", | |
| "assess_urine_output", | |
| ] | |
| INITIAL_TOOL_NAMES = [ | |
| "get_vitals", | |
| "advance_time", | |
| "give_oxygen", | |
| "give_fluids", | |
| "control_bleeding", | |
| "position_patient", | |
| "airway_support", | |
| "summarize_state", | |
| "check_deterioration", | |
| "recommend_next_step", | |
| ] | |
| LEGACY_EXTENDED_TOOL_NAMES = INITIAL_TOOL_NAMES + [ | |
| "give_pressor", | |
| "needle_decompression", | |
| "pericardiocentesis", | |
| "get_respiratory_status", | |
| "get_blood_gas", | |
| "get_cbc", | |
| "get_bmp", | |
| ] | |
| AVAILABLE_TOOL_NAMES = _unique_tool_names( | |
| CLINICAL_45_TOOL_NAMES, | |
| SPEC_PRIORITY_TOOL_NAMES, | |
| LEGACY_EXTENDED_TOOL_NAMES, | |
| ) | |
| RESTRICTED_TOOL_NAMES = { | |
| "initiate_hemorrhage", | |
| "induce_cardiac_arrest", | |
| "apply_pericardial_effusion", | |
| } | |
| class ToolExecution: | |
| """Outcome of a handled tool call.""" | |
| state: PatientState | |
| tool_result: ToolResult | |
| error: ToolError | None | |
| class PulseToolExecutor: | |
| """Executes named tools against a PulseEngineAdapter.""" | |
| def __init__(self, adapter: PulseEngineAdapter) -> None: | |
| self._adapter = adapter | |
| self._handlers: dict[str, Callable[[dict[str, Any]], ToolExecution]] = { | |
| "get_vitals": self._handle_get_vitals, | |
| "get_hemodynamics": self._handle_get_hemodynamics, | |
| "get_blood_chemistry": self._handle_get_blood_chemistry, | |
| "get_respiratory_state": partial(self._handle_get_respiratory_status, tool_name="get_respiratory_state"), | |
| "get_respiratory_status": partial(self._handle_get_respiratory_status, tool_name="get_respiratory_status"), | |
| "get_shock_assessment": self._handle_get_shock_assessment, | |
| "run_triage_assessment": self._handle_run_triage_assessment, | |
| "detect_deterioration": partial(self._handle_check_deterioration, tool_name="detect_deterioration"), | |
| "check_deterioration": partial(self._handle_check_deterioration, tool_name="check_deterioration"), | |
| "advance_time": self._handle_advance_time, | |
| "give_oxygen": self._handle_give_oxygen, | |
| "apply_nasal_cannula": partial(self._handle_fixed_oxygen, tool_name="apply_nasal_cannula", device="nasal_cannula"), | |
| "apply_simple_mask": partial(self._handle_fixed_oxygen, tool_name="apply_simple_mask", device="simple_mask"), | |
| "apply_nonrebreather_mask": partial(self._handle_fixed_oxygen, tool_name="apply_nonrebreather_mask", device="non_rebreather_mask"), | |
| "give_fluids": self._handle_give_fluids, | |
| "administer_crystalloid_bolus": self._handle_administer_crystalloid_bolus, | |
| "administer_blood_transfusion": self._handle_administer_blood_transfusion, | |
| "administer_plasma": self._handle_administer_plasma, | |
| "activate_massive_transfusion_protocol": self._handle_activate_massive_transfusion_protocol, | |
| "control_bleeding": self._handle_control_bleeding, | |
| "apply_tourniquet": self._handle_apply_tourniquet, | |
| "apply_wound_packing": self._handle_apply_wound_packing, | |
| "apply_direct_pressure": self._handle_apply_direct_pressure, | |
| "initiate_hemorrhage": self._handle_initiate_hemorrhage, | |
| "position_patient": self._handle_position_patient, | |
| "airway_support": self._handle_airway_support, | |
| "apply_bag_valve_mask": self._handle_apply_bag_valve_mask, | |
| "perform_intubation": self._handle_perform_intubation, | |
| "set_ventilator_tidal_volume": self._handle_set_ventilator_tidal_volume, | |
| "set_ventilator_rate": self._handle_set_ventilator_rate, | |
| "set_ventilator_fio2": self._handle_set_ventilator_fio2, | |
| "give_pressor": self._handle_give_pressor, | |
| "start_norepinephrine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_norepinephrine_infusion", pressor="norepinephrine"), | |
| "start_dopamine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_dopamine_infusion", pressor="dopamine"), | |
| "start_phenylephrine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_phenylephrine_infusion", pressor="phenylephrine"), | |
| "adjust_infusion_rate": self._handle_adjust_infusion_rate, | |
| "stop_infusion": self._handle_stop_infusion, | |
| "needle_decompression": self._handle_needle_decompression, | |
| "perform_needle_decompression": self._handle_perform_needle_decompression, | |
| "pericardiocentesis": self._handle_pericardiocentesis, | |
| "perform_pericardiocentesis": self._handle_perform_pericardiocentesis, | |
| "perform_cpr": self._handle_perform_cpr, | |
| "induce_cardiac_arrest": self._handle_induce_cardiac_arrest, | |
| "apply_pericardial_effusion": self._handle_apply_pericardial_effusion, | |
| "administer_epinephrine_bolus": partial(self._handle_bolus_drug, tool_name="administer_epinephrine_bolus", drug_key="epinephrine", dose_keys=("dose_mg", "dose")), | |
| "administer_morphine_bolus": partial(self._handle_bolus_drug, tool_name="administer_morphine_bolus", drug_key="morphine", dose_keys=("dose_mg", "dose")), | |
| "administer_ketamine_bolus": partial(self._handle_bolus_drug, tool_name="administer_ketamine_bolus", drug_key="ketamine", dose_keys=("dose_mg_per_kg", "dose")), | |
| "administer_midazolam_bolus": partial(self._handle_bolus_drug, tool_name="administer_midazolam_bolus", drug_key="midazolam", dose_keys=("dose_mg", "dose")), | |
| "administer_lorazepam_bolus": partial(self._handle_bolus_drug, tool_name="administer_lorazepam_bolus", drug_key="lorazepam", dose_keys=("dose_mg", "dose")), | |
| "administer_succinylcholine_bolus": partial(self._handle_bolus_drug, tool_name="administer_succinylcholine_bolus", drug_key="succinylcholine", dose_keys=("dose_mg_per_kg", "dose")), | |
| "administer_phenylephrine_bolus": partial(self._handle_bolus_drug, tool_name="administer_phenylephrine_bolus", drug_key="phenylephrine", dose_keys=("dose_mcg", "dose")), | |
| "administer_atropine_bolus": partial(self._handle_bolus_drug, tool_name="administer_atropine_bolus", drug_key="atropine", dose_keys=("dose_mg", "dose")), | |
| "summarize_state": self._handle_summarize_state, | |
| "recommend_next_step": self._handle_recommend_next_step, | |
| "get_blood_gas": partial(self._handle_get_blood_gas, tool_name="get_blood_gas"), | |
| "get_cbc": partial(self._handle_get_cbc, tool_name="get_cbc"), | |
| "get_bmp": partial(self._handle_get_bmp, tool_name="get_bmp"), | |
| "order_arterial_blood_gas": partial(self._handle_get_blood_gas, tool_name="order_arterial_blood_gas"), | |
| "order_complete_blood_count": partial(self._handle_get_cbc, tool_name="order_complete_blood_count"), | |
| "order_basic_metabolic_panel": partial(self._handle_get_bmp, tool_name="order_basic_metabolic_panel"), | |
| "order_point_of_care_ultrasound": self._handle_order_point_of_care_ultrasound, | |
| "auscultate_chest": self._handle_auscultate_chest, | |
| "assess_consciousness_level": self._handle_assess_consciousness_level, | |
| "check_pain_level": self._handle_check_pain_level, | |
| "measure_core_temperature": self._handle_measure_core_temperature, | |
| "check_end_tidal_co2": self._handle_check_end_tidal_co2, | |
| "calculate_shock_index": self._handle_calculate_shock_index, | |
| "assess_urine_output": self._handle_assess_urine_output, | |
| } | |
| def available_tools(self) -> list[str]: | |
| return list(AVAILABLE_TOOL_NAMES) | |
| def execute(self, action: PulsePhysiologyAction) -> ToolExecution: | |
| """Validate and execute a tool action.""" | |
| tool_name = canonicalize_tool_name(action.tool_name.strip(), allowed_tools=list(self._handlers)) | |
| if tool_name not in self._handlers: | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=self._adapter.get_full_state(), | |
| code="UNSUPPORTED_TOOL", | |
| message=f"Unsupported tool '{tool_name}'.", | |
| retryable=False, | |
| ) | |
| try: | |
| return self._handlers[tool_name](dict(action.arguments)) | |
| except ValueError as exc: | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=self._adapter.get_full_state(), | |
| code="INVALID_ARGUMENT", | |
| message=str(exc), | |
| retryable=False, | |
| ) | |
| except RuntimeError as exc: | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=self._adapter.get_full_state(), | |
| code="ENGINE_ERROR", | |
| message=str(exc), | |
| retryable=True, | |
| ) | |
| def _handle_get_vitals(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| message = ( | |
| f"HR {self._fmt(state.heart_rate_bpm)} bpm, BP " | |
| f"{self._fmt(state.systolic_bp_mmhg)}/{self._fmt(state.diastolic_bp_mmhg)} mmHg, " | |
| f"MAP {self._fmt(state.mean_arterial_pressure_mmhg)} mmHg, SpO2 {self._fmt(state.spo2, precision=3)}, " | |
| f"RR {self._fmt(state.respiration_rate_bpm)}, Temp {self._fmt(state.core_temperature_c)} C, " | |
| f"EtCO2 {self._fmt(state.etco2_mmhg)} mmHg." | |
| ) | |
| return self._success("get_vitals", state, message, previous_state=state) | |
| def _handle_get_hemodynamics(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| hemodynamics = self._adapter.get_hemodynamics_summary() | |
| message = ( | |
| f"CO {self._fmt(hemodynamics['cardiac_output_l_per_min'])} L/min, stroke volume " | |
| f"{self._fmt(hemodynamics['stroke_volume_ml'])} mL, SVR " | |
| f"{self._fmt(hemodynamics['systemic_vascular_resistance_mmhg_min_per_l'])} mmHg*min/L, " | |
| f"MAP {self._fmt(hemodynamics['mean_arterial_pressure_mmhg'])} mmHg." | |
| ) | |
| return self._success("get_hemodynamics", state, message, previous_state=state) | |
| def _handle_get_blood_chemistry(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| chemistry = self._adapter.get_blood_chemistry_summary() | |
| message = ( | |
| f"pH {self._fmt(chemistry['ph'], precision=3)}, lactate {self._fmt(chemistry['lactate_mg_per_dl'])} mg/dL, " | |
| f"hemoglobin {self._fmt(chemistry['hemoglobin_g_per_dl'])} g/dL, base deficit " | |
| f"{self._fmt(chemistry['base_deficit_meq_per_l'])} mEq/L." | |
| ) | |
| return self._success("get_blood_chemistry", state, message, previous_state=state) | |
| def _handle_get_respiratory_status(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| message = ( | |
| f"Breath sounds: {state.breath_sounds}; SpO2 {self._fmt(state.spo2, precision=3)}; " | |
| f"EtCO2 {self._fmt(state.etco2_mmhg)} mmHg; tidal volume {self._fmt(state.tidal_volume_ml)} mL; " | |
| f"airway support: {state.airway_support or 'none'}." | |
| ) | |
| return self._success(tool_name, state, message, previous_state=state) | |
| def _handle_get_shock_assessment(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| shock = self._adapter.get_shock_assessment() | |
| message = ( | |
| f"Shock index {self._fmt(shock['shock_index'], precision=2)}, class {shock['shock_class']}, " | |
| f"perfusion {shock['perfusion_status']}, urine output {self._fmt(shock['urine_output_ml_per_hr'])} mL/hr." | |
| ) | |
| return self._success("get_shock_assessment", state, message, previous_state=state) | |
| def _handle_run_triage_assessment(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| shock = self._adapter.get_shock_assessment() | |
| respiratory = "unstable" if state.spo2 is not None and state.spo2 < 0.9 else "acceptable" | |
| if "cardiac_arrest" in state.active_alerts: | |
| triage = "expectant" | |
| elif state.mean_arterial_pressure_mmhg is not None and state.mean_arterial_pressure_mmhg < 55: | |
| triage = "immediate" | |
| elif state.spo2 is not None and state.spo2 < 0.9: | |
| triage = "immediate" | |
| elif state.active_hemorrhages: | |
| triage = "urgent" | |
| else: | |
| triage = "delayed" | |
| alerts = ", ".join(state.active_alerts) if state.active_alerts else "no active alerts" | |
| message = ( | |
| f"Triage {triage}. Respiratory status {respiratory}; shock class {shock['shock_class']}; " | |
| f"mental status {state.mental_status}; alerts {alerts}." | |
| ) | |
| return self._success("run_triage_assessment", state, message, previous_state=state) | |
| def _handle_check_deterioration(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| deterioration_domains: list[str] = [] | |
| if state.spo2 is not None and state.spo2 < 0.92: | |
| deterioration_domains.append("respiratory") | |
| if state.shock_index is not None and state.shock_index >= 0.9: | |
| deterioration_domains.append("circulatory") | |
| if state.mental_status in {"pain", "unresponsive"}: | |
| deterioration_domains.append("neurologic") | |
| if state.lactate_trend == "worsening": | |
| deterioration_domains.append("perfusion") | |
| if state.pending_diagnostics: | |
| deterioration_domains.append("diagnostics") | |
| if not deterioration_domains: | |
| message = "No major deterioration flags detected right now." | |
| else: | |
| message = f"Deterioration detected in: {', '.join(dict.fromkeys(deterioration_domains))}." | |
| return self._success(tool_name, state, message, previous_state=state) | |
| def _handle_advance_time(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| seconds = self._read_positive_float(arguments, keys=("seconds", "duration_s", "sim_time_s")) | |
| if seconds <= 0: | |
| raise ValueError("seconds must be greater than zero.") | |
| if seconds > 1800: | |
| raise ValueError("seconds must be 1800 or less for a single tool call.") | |
| state = self._adapter.advance_time(seconds) | |
| return self._success( | |
| "advance_time", | |
| state, | |
| f"Advanced simulation by {seconds:.0f} seconds.", | |
| previous_state=previous, | |
| ) | |
| def _handle_give_oxygen(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| device = str(arguments.get("device") or self._suggest_oxygen_device(previous)) | |
| flow_lpm = self._read_optional_float(arguments, keys=("flow_lpm",)) | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.apply_supplemental_oxygen( | |
| device=device, | |
| flow_lpm=flow_lpm, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| resolved_flow = state.oxygen_flow_lpm if state.oxygen_flow_lpm is not None else flow_lpm | |
| return self._success( | |
| "give_oxygen", | |
| state, | |
| f"Supplemental oxygen applied via {state.oxygen_device or device} at {self._fmt(resolved_flow)} L/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_fixed_oxygen(self, arguments: dict[str, Any], *, tool_name: str, device: str) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| flow_lpm = self._read_optional_float(arguments, keys=("flow_rate_lpm", "flow_lpm")) or self._default_oxygen_flow(device) | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.apply_supplemental_oxygen( | |
| device=device, | |
| flow_lpm=flow_lpm, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| tool_name, | |
| state, | |
| f"Applied {device.replace('_', ' ')} at {self._fmt(flow_lpm)} L/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_give_fluids(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| fluid_type = str(arguments.get("fluid_type") or arguments.get("fluid") or "saline").strip().lower() | |
| volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "bag_volume_ml")) or 500.0 | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 100.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| compound = self._adapter.resolve_fluid_compound(fluid_type) | |
| state = self._adapter.infuse_compound( | |
| compound=compound, | |
| bag_volume_ml=volume_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "give_fluids", | |
| state, | |
| f"Started {compound} infusion: {volume_ml:.0f} mL at {rate_ml_per_min:.0f} mL/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_administer_crystalloid_bolus(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| volume_l = self._read_optional_float(arguments, keys=("volume_l",)) | |
| volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "bag_volume_ml")) | |
| resolved_volume_ml = volume_ml if volume_ml is not None else (volume_l * 1000.0 if volume_l is not None else 1000.0) | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 250.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0 | |
| state = self._adapter.infuse_compound( | |
| compound="Saline", | |
| bag_volume_ml=resolved_volume_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "administer_crystalloid_bolus", | |
| state, | |
| f"Administered crystalloid bolus of {resolved_volume_ml:.0f} mL at {rate_ml_per_min:.0f} mL/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_administer_blood_transfusion(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| units = self._read_optional_float(arguments, keys=("units",)) or 1.0 | |
| if units <= 0: | |
| raise ValueError("units must be greater than zero.") | |
| bag_volume_ml = units * 300.0 | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 150.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0 | |
| state = self._adapter.infuse_compound( | |
| compound="PackedRBC", | |
| bag_volume_ml=bag_volume_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "administer_blood_transfusion", | |
| state, | |
| f"Started packed red blood cell transfusion for {units:.1f} unit(s).", | |
| previous_state=previous, | |
| ) | |
| def _handle_administer_plasma(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| if not self._adapter.supports_compound("Plasma"): | |
| return self._failure( | |
| tool_name="administer_plasma", | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message="The local Pulse build does not expose a plasma compound infusion.", | |
| retryable=False, | |
| ) | |
| units = self._read_optional_float(arguments, keys=("units",)) or 1.0 | |
| if units <= 0: | |
| raise ValueError("units must be greater than zero.") | |
| bag_volume_ml = units * 250.0 | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 125.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0 | |
| state = self._adapter.infuse_compound( | |
| compound="Plasma", | |
| bag_volume_ml=bag_volume_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "administer_plasma", | |
| state, | |
| f"Started plasma transfusion for {units:.1f} unit(s).", | |
| previous_state=previous, | |
| ) | |
| def _handle_activate_massive_transfusion_protocol(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| if not self._adapter.supports_compound("Plasma"): | |
| return self._failure( | |
| tool_name="activate_massive_transfusion_protocol", | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message="Full MTP is unavailable because the local Pulse build does not expose plasma.", | |
| retryable=False, | |
| ) | |
| prbc_units = self._read_optional_float(arguments, keys=("prbc_units",)) or 2.0 | |
| plasma_units = self._read_optional_float(arguments, keys=("plasma_units",)) or 2.0 | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 200.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 180.0 | |
| self._adapter.infuse_compound( | |
| compound="PackedRBC", | |
| bag_volume_ml=prbc_units * 300.0, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=0.0, | |
| ) | |
| state = self._adapter.infuse_compound( | |
| compound="Plasma", | |
| bag_volume_ml=plasma_units * 250.0, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "activate_massive_transfusion_protocol", | |
| state, | |
| f"Activated MTP with {prbc_units:.1f} unit(s) PRBC and {plasma_units:.1f} unit(s) plasma.", | |
| previous_state=previous, | |
| ) | |
| def _handle_control_bleeding(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| active_site = self._resolve_bleeding_site(previous, arguments) | |
| if active_site is None: | |
| return self._failure( | |
| tool_name="control_bleeding", | |
| state=previous, | |
| code="PRECONDITION_FAILED", | |
| message="No active hemorrhage is currently tracked.", | |
| retryable=False, | |
| ) | |
| method = str(arguments.get("method") or self._default_bleeding_method(active_site)).strip().lower().replace("-", "_").replace(" ", "_") | |
| current_flow = previous.active_hemorrhages[active_site] | |
| if method == "tourniquet": | |
| new_flow = 0.0 | |
| elif method == "pressure": | |
| new_flow = current_flow * 0.35 | |
| elif method in {"hemostatic_dressing", "pressure_dressing", "wound_packing"}: | |
| new_flow = current_flow * 0.2 | |
| else: | |
| raise ValueError("method must be one of: tourniquet, pressure, hemostatic_dressing") | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.set_hemorrhage( | |
| active_site, | |
| flow_rate_ml_per_min=new_flow, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "control_bleeding", | |
| state, | |
| f"Applied {method} to {active_site.replace('_', ' ')}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_apply_tourniquet(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| limb = self._normalize_site(arguments.get("limb") or arguments.get("site")) | |
| if not limb: | |
| raise ValueError("limb is required and must be one of: left_arm, right_arm, left_leg, right_leg") | |
| if limb not in {"left_arm", "right_arm", "left_leg", "right_leg"}: | |
| raise ValueError("limb must be one of: left_arm, right_arm, left_leg, right_leg") | |
| if limb not in previous.active_hemorrhages: | |
| return self._failure( | |
| tool_name="apply_tourniquet", | |
| state=previous, | |
| code="PRECONDITION_FAILED", | |
| message=f"No active limb hemorrhage is tracked at '{limb}'.", | |
| retryable=False, | |
| ) | |
| state = self._adapter.set_hemorrhage(limb, flow_rate_ml_per_min=0.0, advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0) | |
| return self._success("apply_tourniquet", state, f"Tourniquet applied to {limb.replace('_', ' ')}.", previous_state=previous) | |
| def _handle_apply_wound_packing(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| site = self._resolve_bleeding_site(previous, arguments) | |
| if site is None: | |
| return self._failure( | |
| tool_name="apply_wound_packing", | |
| state=previous, | |
| code="PRECONDITION_FAILED", | |
| message="No active hemorrhage is currently tracked.", | |
| retryable=False, | |
| ) | |
| state = self._adapter.set_hemorrhage( | |
| site, | |
| flow_rate_ml_per_min=previous.active_hemorrhages[site] * 0.2, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success("apply_wound_packing", state, f"Wound packing applied to {site.replace('_', ' ')}.", previous_state=previous) | |
| def _handle_apply_direct_pressure(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| site = self._resolve_bleeding_site(previous, arguments) | |
| if site is None: | |
| return self._failure( | |
| tool_name="apply_direct_pressure", | |
| state=previous, | |
| code="PRECONDITION_FAILED", | |
| message="No active hemorrhage is currently tracked.", | |
| retryable=False, | |
| ) | |
| state = self._adapter.set_hemorrhage( | |
| site, | |
| flow_rate_ml_per_min=previous.active_hemorrhages[site] * 0.35, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success("apply_direct_pressure", state, f"Direct pressure applied to {site.replace('_', ' ')}.", previous_state=previous) | |
| def _handle_initiate_hemorrhage(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| restriction = self._restricted_tool_failure("initiate_hemorrhage", previous, arguments) | |
| if restriction is not None: | |
| return restriction | |
| compartment = str(arguments.get("compartment") or "").strip() | |
| rate_ml_per_min = self._read_positive_float(arguments, keys=("rate_ml_per_min", "rate")) | |
| hemorrhage_type = str(arguments.get("hemorrhage_type") or "external") | |
| state = self._adapter.set_hemorrhage( | |
| compartment, | |
| flow_rate_ml_per_min=rate_ml_per_min, | |
| hemorrhage_type=hemorrhage_type, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 30.0, | |
| ) | |
| return self._success( | |
| "initiate_hemorrhage", | |
| state, | |
| f"Initiated {hemorrhage_type} hemorrhage in {compartment} at {rate_ml_per_min:.0f} mL/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_position_patient(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| position = str(arguments.get("position") or self._suggest_position(previous)) | |
| state = self._adapter.set_patient_position(position) | |
| return self._success( | |
| "position_patient", | |
| state, | |
| f"Patient position set to {state.position.replace('_', ' ')}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_airway_support(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| requested_support = arguments.get("support_type") or arguments.get("mode") | |
| support_type = str(requested_support or self._suggest_airway_support(previous)) | |
| support_key = support_type.strip().lower().replace("-", "_").replace(" ", "_") | |
| if support_key in {"auto", "basic", "default", "standard", "support", "airway_support"}: | |
| support_type = self._suggest_airway_support(previous) | |
| support_key = support_type.strip().lower().replace("-", "_").replace(" ", "_") | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| if support_key in {"bag_valve_mask", "bvm"}: | |
| state = self._adapter.apply_bag_valve_mask( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")), | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")), | |
| respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")), | |
| inspiratory_expiratory_ratio=self._read_optional_float(arguments, keys=("ie_ratio", "inspiratory_expiratory_ratio")), | |
| squeeze_pressure_cmh2o=self._read_optional_float(arguments, keys=("squeeze_pressure_cmh2o", "pressure_cmh2o")), | |
| squeeze_volume_ml=self._read_optional_float(arguments, keys=("squeeze_volume_ml", "tidal_volume_ml")), | |
| airway_adjunct=arguments.get("airway_adjunct"), | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| elif support_key == "cpap": | |
| state = self._adapter.apply_cpap( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")), | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")), | |
| pressure_support_cmh2o=self._read_optional_float(arguments, keys=("pressure_support_cmh2o", "pressure_support")), | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| elif support_key in {"pressure_control_ventilation", "pressure_control"}: | |
| state = self._adapter.apply_pressure_control_ventilation( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")), | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")), | |
| inspiratory_pressure_cmh2o=self._read_optional_float(arguments, keys=("inspiratory_pressure_cmh2o", "pressure_cmh2o")), | |
| respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")), | |
| inspiratory_period_s=self._read_optional_float(arguments, keys=("inspiratory_period_s",)), | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| elif support_key in {"volume_control_ventilation", "volume_control"}: | |
| state = self._adapter.apply_volume_control_ventilation( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")), | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")), | |
| tidal_volume_ml=self._read_optional_float(arguments, keys=("tidal_volume_ml", "squeeze_volume_ml")), | |
| respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")), | |
| inspiratory_period_s=self._read_optional_float(arguments, keys=("inspiratory_period_s",)), | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| else: | |
| state = self._adapter.set_intubation(support_type, advance_time_seconds=monitor_seconds) | |
| return self._success( | |
| "airway_support", | |
| state, | |
| f"Airway support set to {state.airway_support or 'off'}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_apply_bag_valve_mask(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| rate_bpm = self._read_optional_float(arguments, keys=("rate_bpm", "respiration_rate_bpm")) or 12.0 | |
| volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or 500.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.apply_bag_valve_mask( | |
| respiration_rate_bpm=rate_bpm, | |
| squeeze_volume_ml=volume_ml, | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")), | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")), | |
| airway_adjunct=arguments.get("airway_adjunct"), | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "apply_bag_valve_mask", | |
| state, | |
| f"BVM ventilation started at {self._fmt(rate_bpm)} bpm with {self._fmt(volume_ml)} mL squeezes.", | |
| previous_state=previous, | |
| ) | |
| def _handle_perform_intubation(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| airway_type = str(arguments.get("airway_type") or arguments.get("kind") or "tracheal") | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.set_intubation(airway_type, advance_time_seconds=monitor_seconds) | |
| return self._success( | |
| "perform_intubation", | |
| state, | |
| f"Performed {airway_type.replace('_', ' ')} intubation.", | |
| previous_state=previous, | |
| ) | |
| def _handle_set_ventilator_tidal_volume(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| tidal_volume_ml = self._read_positive_float(arguments, keys=("volume_ml", "tidal_volume_ml")) | |
| runtime = self._adapter._runtime | |
| state = self._adapter.apply_volume_control_ventilation( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2",)) or runtime.fio2, | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o, | |
| tidal_volume_ml=tidal_volume_ml, | |
| respiration_rate_bpm=self._read_optional_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm")) or runtime.ventilator_respiration_rate_bpm, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success( | |
| "set_ventilator_tidal_volume", | |
| state, | |
| f"Ventilator tidal volume set to {self._fmt(tidal_volume_ml)} mL.", | |
| previous_state=previous, | |
| ) | |
| def _handle_set_ventilator_rate(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| breaths_per_min = self._read_positive_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm")) | |
| runtime = self._adapter._runtime | |
| state = self._adapter.apply_volume_control_ventilation( | |
| fio2=self._read_optional_fio2(arguments, keys=("fio2",)) or runtime.fio2, | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o, | |
| tidal_volume_ml=self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or runtime.ventilator_tidal_volume_ml, | |
| respiration_rate_bpm=breaths_per_min, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success( | |
| "set_ventilator_rate", | |
| state, | |
| f"Ventilator rate set to {self._fmt(breaths_per_min)} breaths/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_set_ventilator_fio2(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| fio2 = self._read_positive_fio2(arguments, keys=("fraction", "fio2", "fraction_inspired_oxygen")) | |
| if fio2 > 1.0: | |
| raise ValueError("fio2 must be between 0.21 and 1.0.") | |
| runtime = self._adapter._runtime | |
| state = self._adapter.apply_volume_control_ventilation( | |
| fio2=fio2, | |
| peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o, | |
| tidal_volume_ml=self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or runtime.ventilator_tidal_volume_ml, | |
| respiration_rate_bpm=self._read_optional_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm")) or runtime.ventilator_respiration_rate_bpm, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success( | |
| "set_ventilator_fio2", | |
| state, | |
| f"Ventilator FiO2 set to {fio2:.2f}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_give_pressor(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| pressor = str(arguments.get("pressor") or arguments.get("agent") or "norepinephrine") | |
| rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) | |
| concentration_ug_per_ml = self._read_optional_float(arguments, keys=("concentration_ug_per_ml",)) | |
| if self._read_optional_bool(arguments, keys=("stop",)) is True: | |
| rate_ml_per_min = 0.0 | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.set_pressor( | |
| pressor=pressor, | |
| concentration_ug_per_ml=concentration_ug_per_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| if rate_ml_per_min == 0.0: | |
| message = f"Stopped {pressor} infusion." | |
| else: | |
| pressor_key = pressor.strip().lower().replace("-", "_").replace(" ", "_") | |
| active_rate = state.active_infusions.get(pressor_key) | |
| rate_text = self._fmt(active_rate) if active_rate is not None else self._fmt(rate_ml_per_min) | |
| concentration_text = ( | |
| self._fmt(concentration_ug_per_ml) | |
| if concentration_ug_per_ml is not None | |
| else "default" | |
| ) | |
| message = ( | |
| f"Started {pressor} at {rate_text} mL/min with concentration " | |
| f"{concentration_text} ug/mL." | |
| ) | |
| return self._success("give_pressor", state, message, previous_state=previous) | |
| def _handle_start_pressor_infusion(self, arguments: dict[str, Any], *, tool_name: str, pressor: str) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| if pressor not in PulseEngineAdapter._PRESSOR_CONFIG: | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message=f"The local Pulse build does not expose {pressor}.", | |
| retryable=False, | |
| ) | |
| dose = self._read_positive_float(arguments, keys=("dose_mcg_per_kg_per_min", "dose")) | |
| weight_kg = self._adapter.get_patient_weight_kg() | |
| if weight_kg is None or weight_kg <= 0: | |
| raise RuntimeError("Patient weight is unavailable for mcg/kg/min infusion dosing.") | |
| concentration_ug_per_ml = ( | |
| self._read_optional_float(arguments, keys=("concentration_ug_per_ml",)) | |
| or PulseEngineAdapter._PRESSOR_CONFIG[pressor]["default_concentration_ug_per_ml"] | |
| ) | |
| rate_ml_per_min = (dose * weight_kg) / concentration_ug_per_ml | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.set_pressor( | |
| pressor=pressor, | |
| concentration_ug_per_ml=concentration_ug_per_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| tool_name, | |
| state, | |
| f"Started {pressor} infusion at {dose:.2f} mcg/kg/min ({rate_ml_per_min:.2f} mL/min).", | |
| previous_state=previous, | |
| ) | |
| def _handle_adjust_infusion_rate(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| drug_name = str(arguments.get("drug_name") or arguments.get("pressor") or "").strip().lower().replace("-", "_").replace(" ", "_") | |
| if not drug_name: | |
| raise ValueError("drug_name is required.") | |
| if drug_name not in PulseEngineAdapter._PRESSOR_CONFIG: | |
| return self._failure( | |
| tool_name="adjust_infusion_rate", | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message=f"Only pressor infusions are adjustable right now. Unsupported infusion '{drug_name}'.", | |
| retryable=False, | |
| ) | |
| new_dose = self._read_positive_float(arguments, keys=("new_dose", "dose_mcg_per_kg_per_min", "dose")) | |
| weight_kg = self._adapter.get_patient_weight_kg() | |
| if weight_kg is None or weight_kg <= 0: | |
| raise RuntimeError("Patient weight is unavailable for mcg/kg/min infusion dosing.") | |
| concentration_ug_per_ml = ( | |
| self._read_optional_float(arguments, keys=("concentration_ug_per_ml",)) | |
| or PulseEngineAdapter._PRESSOR_CONFIG[drug_name]["default_concentration_ug_per_ml"] | |
| ) | |
| rate_ml_per_min = (new_dose * weight_kg) / concentration_ug_per_ml | |
| state = self._adapter.set_pressor( | |
| pressor=drug_name, | |
| concentration_ug_per_ml=concentration_ug_per_ml, | |
| rate_ml_per_min=rate_ml_per_min, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success( | |
| "adjust_infusion_rate", | |
| state, | |
| f"Adjusted {drug_name} to {new_dose:.2f} mcg/kg/min ({rate_ml_per_min:.2f} mL/min).", | |
| previous_state=previous, | |
| ) | |
| def _handle_stop_infusion(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| drug_name = str(arguments.get("drug_name") or arguments.get("pressor") or "").strip().lower().replace("-", "_").replace(" ", "_") | |
| if not drug_name: | |
| raise ValueError("drug_name is required.") | |
| if drug_name not in PulseEngineAdapter._PRESSOR_CONFIG: | |
| return self._failure( | |
| tool_name="stop_infusion", | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message=f"Only pressor infusions are stoppable right now. Unsupported infusion '{drug_name}'.", | |
| retryable=False, | |
| ) | |
| state = self._adapter.set_pressor( | |
| pressor=drug_name, | |
| concentration_ug_per_ml=PulseEngineAdapter._PRESSOR_CONFIG[drug_name]["default_concentration_ug_per_ml"], | |
| rate_ml_per_min=0.0, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success("stop_infusion", state, f"Stopped {drug_name} infusion.", previous_state=previous) | |
| def _handle_needle_decompression(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| side = str(arguments.get("side") or self._suggest_needle_side(previous)) | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 90.0 | |
| state = self._adapter.apply_needle_decompression(side, advance_time_seconds=monitor_seconds) | |
| return self._success( | |
| "needle_decompression", | |
| state, | |
| f"Needle decompression performed on the {side.lower()} chest.", | |
| previous_state=previous, | |
| ) | |
| def _handle_perform_needle_decompression(self, arguments: dict[str, Any]) -> ToolExecution: | |
| result = self._handle_needle_decompression(arguments) | |
| result.tool_result.tool_name = "perform_needle_decompression" | |
| return result | |
| def _handle_pericardiocentesis(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| drain_rate_ml_per_min = ( | |
| self._read_optional_float(arguments, keys=("drain_rate_ml_per_min", "rate_ml_per_min")) or 150.0 | |
| ) | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 180.0 | |
| state = self._adapter.perform_pericardiocentesis( | |
| drain_rate_ml_per_min=drain_rate_ml_per_min, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "pericardiocentesis", | |
| state, | |
| f"Pericardiocentesis performed with drainage at {self._fmt(drain_rate_ml_per_min)} mL/min.", | |
| previous_state=previous, | |
| ) | |
| def _handle_perform_pericardiocentesis(self, arguments: dict[str, Any]) -> ToolExecution: | |
| result = self._handle_pericardiocentesis(arguments) | |
| result.tool_result.tool_name = "perform_pericardiocentesis" | |
| return result | |
| def _handle_perform_cpr(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| rate = self._read_optional_float(arguments, keys=("compression_rate_bpm", "rate_bpm")) or PulseEngineAdapter.DEFAULT_CPR_RATE_BPM | |
| depth_cm = self._read_optional_float(arguments, keys=("depth_cm",)) or PulseEngineAdapter.DEFAULT_CPR_DEPTH_CM | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0 | |
| state = self._adapter.perform_cpr( | |
| compression_rate_bpm=rate, | |
| depth_cm=depth_cm, | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| "perform_cpr", | |
| state, | |
| f"Performed CPR at {self._fmt(rate)} compressions/min with {self._fmt(depth_cm)} cm depth.", | |
| previous_state=previous, | |
| ) | |
| def _handle_induce_cardiac_arrest(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| restriction = self._restricted_tool_failure("induce_cardiac_arrest", previous, arguments) | |
| if restriction is not None: | |
| return restriction | |
| state = self._adapter.induce_cardiac_arrest( | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 30.0, | |
| ) | |
| return self._success("induce_cardiac_arrest", state, "Induced cardiac arrest for scenario-authoring.", previous_state=previous) | |
| def _handle_apply_pericardial_effusion(self, arguments: dict[str, Any]) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| restriction = self._restricted_tool_failure("apply_pericardial_effusion", previous, arguments) | |
| if restriction is not None: | |
| return restriction | |
| severity = self._read_positive_float(arguments, keys=("severity",)) | |
| if severity > 1.0: | |
| raise ValueError("severity must be between 0.0 and 1.0.") | |
| effusion_rate_ml_per_min = severity * 150.0 | |
| state = self._adapter.set_pericardial_effusion( | |
| effusion_rate_ml_per_min=effusion_rate_ml_per_min, | |
| advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0, | |
| ) | |
| return self._success( | |
| "apply_pericardial_effusion", | |
| state, | |
| f"Applied pericardial effusion with severity {severity:.2f}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_bolus_drug( | |
| self, | |
| arguments: dict[str, Any], | |
| *, | |
| tool_name: str, | |
| drug_key: str, | |
| dose_keys: tuple[str, ...], | |
| ) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| config = PulseEngineAdapter._BOLUS_DRUG_CONFIG.get(drug_key) | |
| if config is None: | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=previous, | |
| code="UNSUPPORTED_BY_ENGINE", | |
| message=f"The local Pulse build does not expose {drug_key}.", | |
| retryable=False, | |
| ) | |
| dose_input = self._read_positive_float(arguments, keys=dose_keys) | |
| total_dose = self._adapter.compute_drug_dose(dose=dose_input, dose_unit=config["dose_argument"]) | |
| monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or config["duration_s"] | |
| state = self._adapter.administer_substance_bolus( | |
| drug_name=config["substance"], | |
| dose=total_dose, | |
| dose_unit=config["dose_argument"], | |
| concentration_value=config["concentration_value"], | |
| concentration_unit=config["concentration_unit"], | |
| duration_s=config["duration_s"], | |
| advance_time_seconds=monitor_seconds, | |
| ) | |
| return self._success( | |
| tool_name, | |
| state, | |
| f"Administered {config['substance']} bolus with input dose {dose_input:.2f} {config['dose_argument'].replace('_', '/').replace('mg', 'mg').replace('mcg', 'mcg')}.", | |
| previous_state=previous, | |
| ) | |
| def _handle_summarize_state(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| alerts = ", ".join(state.active_alerts) if state.active_alerts else "no active alerts" | |
| pending = ", ".join( | |
| f"{name}:{seconds}s" for name, seconds in sorted(state.pending_diagnostics.items()) | |
| ) or "none" | |
| ready = ", ".join(state.ready_diagnostics) if state.ready_diagnostics else "none" | |
| message = ( | |
| f"{state.scenario_id} ({state.scenario_difficulty}): HR {self._fmt(state.heart_rate_bpm)}, " | |
| f"MAP {self._fmt(state.mean_arterial_pressure_mmhg)}, SpO2 {self._fmt(state.spo2, precision=3)}, " | |
| f"mental status {state.mental_status}, alerts {alerts}, pending diagnostics {pending}, ready diagnostics {ready}." | |
| ) | |
| return self._success("summarize_state", state, message, previous_state=state) | |
| def _handle_recommend_next_step(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| recommendation = self._recommend_next_tool(state) | |
| return self._success( | |
| "recommend_next_step", | |
| state, | |
| f"Recommended next step: {recommendation}.", | |
| previous_state=state, | |
| ) | |
| def _handle_get_blood_gas(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution: | |
| return self._handle_delayed_diagnostic( | |
| tool_name=tool_name, | |
| diagnostic_key="order_arterial_blood_gas", | |
| label="ABG", | |
| render_result=lambda state: ( | |
| f"ABG pH {self._fmt(state.abg_result.ph, precision=3)}, PaO2 " | |
| f"{self._fmt(state.abg_result.partial_pressure_of_oxygen_mmhg)} mmHg, PaCO2 " | |
| f"{self._fmt(state.abg_result.partial_pressure_of_carbon_dioxide_mmhg)} mmHg, lactate " | |
| f"{self._fmt(state.abg_result.lactate_mg_per_dl)} mg/dL." | |
| ), | |
| ) | |
| def _handle_get_cbc(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution: | |
| return self._handle_delayed_diagnostic( | |
| tool_name=tool_name, | |
| diagnostic_key="order_complete_blood_count", | |
| label="CBC", | |
| render_result=lambda state: ( | |
| f"CBC hemoglobin {self._fmt(state.cbc_result.hemoglobin_g_per_dl)} g/dL, hematocrit " | |
| f"{self._fmt(state.cbc_result.hematocrit_fraction, precision=3)}, WBC " | |
| f"{self._fmt(state.cbc_result.white_blood_cell_count_per_u_l)} /uL." | |
| ), | |
| ) | |
| def _handle_get_bmp(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution: | |
| return self._handle_delayed_diagnostic( | |
| tool_name=tool_name, | |
| diagnostic_key="order_basic_metabolic_panel", | |
| label="BMP", | |
| render_result=lambda state: ( | |
| f"BMP sodium {self._fmt(state.bmp_result.sodium_mmol_per_l)} mmol/L, potassium " | |
| f"{self._fmt(state.bmp_result.potassium_mmol_per_l)} mmol/L, creatinine " | |
| f"{self._fmt(state.bmp_result.creatinine_mg_per_dl)} mg/dL, glucose " | |
| f"{self._fmt(state.bmp_result.glucose_mg_per_dl)} mg/dL." | |
| ), | |
| ) | |
| def _handle_order_point_of_care_ultrasound(self, arguments: dict[str, Any]) -> ToolExecution: | |
| region = str(arguments.get("region") or "cardiac").strip().lower().replace("-", "_").replace(" ", "_") | |
| diagnostic_key = f"order_point_of_care_ultrasound:{region}" | |
| return self._handle_delayed_diagnostic( | |
| tool_name="order_point_of_care_ultrasound", | |
| diagnostic_key=diagnostic_key, | |
| label=f"POCUS ({region})", | |
| delay_seconds=PulseEngineAdapter.DIAGNOSTIC_DELAYS_S["order_point_of_care_ultrasound"], | |
| render_result=lambda _state, region=region: self._adapter.get_ultrasound_summary(region), | |
| ) | |
| def _handle_auscultate_chest(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| if state.breath_sounds == "present bilateral": | |
| message = "Auscultation: bilateral breath sounds present." | |
| else: | |
| message = f"Auscultation: breath sounds are {state.breath_sounds}." | |
| return self._success("auscultate_chest", state, message, previous_state=state) | |
| def _handle_assess_consciousness_level(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| consciousness = self._adapter.get_consciousness_summary() | |
| message = ( | |
| f"Consciousness level {consciousness['mental_status']} with approximate GCS " | |
| f"{self._fmt(float(consciousness['gcs_equivalent']))}." | |
| ) | |
| return self._success("assess_consciousness_level", state, message, previous_state=state) | |
| def _handle_check_pain_level(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| pain_score = self._adapter.get_pain_score_0_to_10() | |
| message = f"Estimated pain score {pain_score:.1f}/10." | |
| return self._success("check_pain_level", state, message, previous_state=state) | |
| def _handle_measure_core_temperature(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| message = f"Core temperature {self._fmt(state.core_temperature_c)} C." | |
| return self._success("measure_core_temperature", state, message, previous_state=state) | |
| def _handle_check_end_tidal_co2(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| message = f"End tidal CO2 {self._fmt(state.etco2_mmhg)} mmHg." | |
| return self._success("check_end_tidal_co2", state, message, previous_state=state) | |
| def _handle_calculate_shock_index(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| shock = self._adapter.get_shock_assessment() | |
| message = f"Shock index {self._fmt(shock['shock_index'], precision=2)} with class {shock['shock_class']}." | |
| return self._success("calculate_shock_index", state, message, previous_state=state) | |
| def _handle_assess_urine_output(self, _arguments: dict[str, Any]) -> ToolExecution: | |
| state = self._adapter.get_full_state() | |
| shock = self._adapter.get_shock_assessment() | |
| message = f"Estimated urine output {self._fmt(shock['urine_output_ml_per_hr'])} mL/hr." | |
| return self._success("assess_urine_output", state, message, previous_state=state) | |
| def _recommend_next_tool(self, state: PatientState) -> str: | |
| if "possible_tension_pneumothorax" in state.active_alerts or "unilateral_absent_breath_sounds" in state.active_alerts: | |
| return "perform_needle_decompression" | |
| if "possible_cardiac_tamponade" in state.active_alerts: | |
| return "perform_pericardiocentesis" | |
| if state.active_hemorrhages: | |
| limb_sites = {"left_arm", "right_arm", "left_leg", "right_leg"} | |
| if any(site in limb_sites for site in state.active_hemorrhages): | |
| return "apply_tourniquet" | |
| return "apply_direct_pressure" | |
| if state.spo2 is not None and state.spo2 < 0.88 and state.airway_support not in { | |
| "bag_valve_mask", | |
| "pressure_control_ventilation", | |
| "volume_control_ventilation", | |
| }: | |
| return "apply_bag_valve_mask" | |
| if state.spo2 is not None and state.spo2 < 0.92: | |
| return "apply_nonrebreather_mask" | |
| if state.mean_arterial_pressure_mmhg is not None and state.mean_arterial_pressure_mmhg < 65: | |
| if any(key in state.active_infusions for key in {"blood", "packed_rbc", "saline"}): | |
| return "start_norepinephrine_infusion" | |
| return "administer_crystalloid_bolus" | |
| if any(key.startswith("order_arterial_blood_gas") for key in state.ready_diagnostics): | |
| return "order_arterial_blood_gas" | |
| if state.mental_status in {"pain", "unresponsive"} and state.airway_support is None: | |
| return "perform_intubation" | |
| return "get_vitals" | |
| def _handle_delayed_diagnostic( | |
| self, | |
| *, | |
| tool_name: str, | |
| diagnostic_key: str, | |
| label: str, | |
| render_result: Callable[[PatientState], str], | |
| delay_seconds: int | None = None, | |
| ) -> ToolExecution: | |
| previous = self._adapter.get_full_state() | |
| if diagnostic_key in previous.ready_diagnostics: | |
| return self._success(tool_name, previous, render_result(previous), previous_state=previous) | |
| if diagnostic_key in previous.pending_diagnostics: | |
| remaining = previous.pending_diagnostics[diagnostic_key] | |
| return self._success( | |
| tool_name, | |
| previous, | |
| f"{label} is pending. {remaining} simulated seconds remaining before results are ready.", | |
| previous_state=previous, | |
| ) | |
| state = self._adapter.schedule_diagnostic( | |
| diagnostic_key, | |
| delay_seconds=delay_seconds or self._default_diagnostic_delay(diagnostic_key), | |
| ) | |
| remaining = state.pending_diagnostics.get(diagnostic_key) | |
| return self._success( | |
| tool_name, | |
| state, | |
| f"Ordered {label}. Results will be ready after about {remaining} simulated seconds.", | |
| previous_state=previous, | |
| ) | |
| def _restricted_tool_failure( | |
| self, | |
| tool_name: str, | |
| state: PatientState, | |
| arguments: dict[str, Any], | |
| ) -> ToolExecution | None: | |
| if self._read_optional_bool(arguments, keys=("admin_override",)) is True: | |
| return None | |
| return self._failure( | |
| tool_name=tool_name, | |
| state=state, | |
| code="RESTRICTED_TOOL", | |
| message=f"{tool_name} is restricted to scenario-authoring workflows. Pass admin_override=true to use it deliberately.", | |
| retryable=False, | |
| ) | |
| def _resolve_bleeding_site(self, state: PatientState, arguments: dict[str, Any]) -> str | None: | |
| if not state.active_hemorrhages: | |
| return None | |
| site = self._normalize_site(arguments.get("site") or arguments.get("compartment")) | |
| if site: | |
| if site not in state.active_hemorrhages: | |
| raise ValueError(f"'{site}' is not an active hemorrhage site.") | |
| return site | |
| if len(state.active_hemorrhages) == 1: | |
| return next(iter(state.active_hemorrhages)) | |
| raise ValueError("Multiple active hemorrhages are present; specify a site.") | |
| def _normalize_site(value: object) -> str: | |
| if value is None: | |
| return "" | |
| return str(value).strip().lower().replace("-", "_").replace(" ", "_") | |
| def _suggest_oxygen_device(state: PatientState) -> str: | |
| if state.spo2 is not None and state.spo2 < 0.9: | |
| return "non_rebreather_mask" | |
| if state.spo2 is not None and state.spo2 < 0.95: | |
| return "simple_mask" | |
| return "nasal_cannula" | |
| def _default_oxygen_flow(device: str) -> float: | |
| return { | |
| "nasal_cannula": 4.0, | |
| "simple_mask": 8.0, | |
| "non_rebreather_mask": 15.0, | |
| }[device] | |
| def _default_bleeding_method(site: str) -> str: | |
| if site in {"left_arm", "right_arm", "left_leg", "right_leg"}: | |
| return "tourniquet" | |
| return "pressure" | |
| def _suggest_position(state: PatientState) -> str: | |
| if state.spo2 is not None and state.spo2 < 0.92: | |
| return "upright" | |
| return "supine" | |
| def _suggest_airway_support(state: PatientState) -> str: | |
| if state.spo2 is not None and state.spo2 < 0.85: | |
| return "pressure_control_ventilation" if state.mental_status in {"pain", "unresponsive"} else "bag_valve_mask" | |
| if state.spo2 is not None and state.spo2 < 0.9: | |
| return "cpap" if state.mental_status in {"alert", "verbal"} else "bag_valve_mask" | |
| if state.mental_status in {"pain", "unresponsive"}: | |
| return "tracheal" | |
| if state.mental_status == "verbal": | |
| return "oropharyngeal" | |
| return "nasopharyngeal" | |
| def _suggest_needle_side(state: PatientState) -> str: | |
| if "absent left" in state.breath_sounds: | |
| return "left" | |
| if "absent right" in state.breath_sounds: | |
| return "right" | |
| return "left" | |
| def _default_diagnostic_delay(diagnostic_key: str) -> int: | |
| base_key = diagnostic_key.split(":", 1)[0] | |
| return PulseEngineAdapter.DIAGNOSTIC_DELAYS_S.get(base_key, 120) | |
| def _read_positive_float(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float: | |
| value = PulseToolExecutor._read_optional_float(arguments, keys=keys) | |
| if value is None: | |
| joined = ", ".join(keys) | |
| raise ValueError(f"One of {joined} is required.") | |
| return value | |
| def _read_positive_fio2(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float: | |
| value = PulseToolExecutor._read_optional_fio2(arguments, keys=keys) | |
| if value is None: | |
| joined = ", ".join(keys) | |
| raise ValueError(f"One of {joined} is required.") | |
| return value | |
| def _read_optional_float(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float | None: | |
| for key in keys: | |
| if key in arguments and arguments[key] is not None: | |
| return coerce_numeric_argument(arguments[key]) | |
| return None | |
| def _read_optional_fio2(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float | None: | |
| value = PulseToolExecutor._read_optional_float(arguments, keys=keys) | |
| if value is None: | |
| return None | |
| if 1.0 < value <= 100.0: | |
| return value / 100.0 | |
| return value | |
| def _read_optional_bool(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> bool | None: | |
| for key in keys: | |
| if key in arguments and arguments[key] is not None: | |
| return coerce_boolean_argument(arguments[key]) | |
| return None | |
| def _fmt(value: float | None, *, precision: int = 1) -> str: | |
| if value is None: | |
| return "n/a" | |
| return f"{value:.{precision}f}" | |
| def _changed_fields(previous_state: PatientState, current_state: PatientState) -> list[str]: | |
| previous_dump = previous_state.model_dump() | |
| current_dump = current_state.model_dump() | |
| return [ | |
| field_name | |
| for field_name in current_dump | |
| if previous_dump.get(field_name) != current_dump.get(field_name) | |
| ] | |
| def _success( | |
| self, | |
| tool_name: str, | |
| state: PatientState, | |
| message: str, | |
| *, | |
| previous_state: PatientState, | |
| ) -> ToolExecution: | |
| changed_fields = self._changed_fields(previous_state, state) | |
| return ToolExecution( | |
| state=state, | |
| tool_result=ToolResult( | |
| tool_name=tool_name, | |
| success=True, | |
| message=message, | |
| state_changed=bool(changed_fields), | |
| changed_fields=changed_fields, | |
| ), | |
| error=None, | |
| ) | |
| def _failure( | |
| self, | |
| *, | |
| tool_name: str, | |
| state: PatientState, | |
| code: str, | |
| message: str, | |
| retryable: bool, | |
| ) -> ToolExecution: | |
| error = ToolError(code=code, message=message, retryable=retryable) | |
| return ToolExecution( | |
| state=state, | |
| tool_result=ToolResult( | |
| tool_name=tool_name, | |
| success=False, | |
| message=message, | |
| state_changed=False, | |
| changed_fields=[], | |
| ), | |
| error=error, | |
| ) | |