Pulse_ER_env / server /tools.py
KChad's picture
Add all docs_assets image assets to Hugging Face Space snapshot
9b1756a
"""Tool registry and execution logic for the Pulse physiology environment."""
from __future__ import annotations
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable
from pulse_physiology_env.models import PulsePhysiologyAction, ToolError, ToolResult
from pulse_physiology_env.patient_state import PatientState
from pulse_physiology_env.tool_catalog import (
canonicalize_tool_name,
coerce_boolean_argument,
coerce_numeric_argument,
)
from .pulse_engine_adapter import PulseEngineAdapter
def _unique_tool_names(*groups: list[str]) -> list[str]:
ordered: list[str] = []
seen: set[str] = set()
for group in groups:
for name in group:
if name not in seen:
seen.add(name)
ordered.append(name)
return ordered
SPEC_PRIORITY_TOOL_NAMES = [
"get_vitals",
"get_hemodynamics",
"get_blood_chemistry",
"get_respiratory_state",
"get_shock_assessment",
"perform_needle_decompression",
"administer_crystalloid_bolus",
"start_norepinephrine_infusion",
"administer_epinephrine_bolus",
"perform_intubation",
"run_triage_assessment",
"detect_deterioration",
]
CLINICAL_45_TOOL_NAMES = [
"apply_nasal_cannula",
"apply_simple_mask",
"apply_nonrebreather_mask",
"apply_bag_valve_mask",
"perform_intubation",
"set_ventilator_tidal_volume",
"set_ventilator_rate",
"set_ventilator_fio2",
"initiate_hemorrhage",
"apply_tourniquet",
"apply_wound_packing",
"apply_direct_pressure",
"perform_needle_decompression",
"perform_pericardiocentesis",
"perform_cpr",
"induce_cardiac_arrest",
"apply_pericardial_effusion",
"administer_epinephrine_bolus",
"administer_morphine_bolus",
"administer_ketamine_bolus",
"administer_midazolam_bolus",
"administer_lorazepam_bolus",
"administer_succinylcholine_bolus",
"administer_phenylephrine_bolus",
"administer_atropine_bolus",
"start_norepinephrine_infusion",
"start_dopamine_infusion",
"start_phenylephrine_infusion",
"adjust_infusion_rate",
"stop_infusion",
"administer_crystalloid_bolus",
"administer_blood_transfusion",
"administer_plasma",
"activate_massive_transfusion_protocol",
"auscultate_chest",
"assess_consciousness_level",
"check_pain_level",
"measure_core_temperature",
"check_end_tidal_co2",
"calculate_shock_index",
"order_arterial_blood_gas",
"order_complete_blood_count",
"order_basic_metabolic_panel",
"order_point_of_care_ultrasound",
"assess_urine_output",
]
INITIAL_TOOL_NAMES = [
"get_vitals",
"advance_time",
"give_oxygen",
"give_fluids",
"control_bleeding",
"position_patient",
"airway_support",
"summarize_state",
"check_deterioration",
"recommend_next_step",
]
LEGACY_EXTENDED_TOOL_NAMES = INITIAL_TOOL_NAMES + [
"give_pressor",
"needle_decompression",
"pericardiocentesis",
"get_respiratory_status",
"get_blood_gas",
"get_cbc",
"get_bmp",
]
AVAILABLE_TOOL_NAMES = _unique_tool_names(
CLINICAL_45_TOOL_NAMES,
SPEC_PRIORITY_TOOL_NAMES,
LEGACY_EXTENDED_TOOL_NAMES,
)
RESTRICTED_TOOL_NAMES = {
"initiate_hemorrhage",
"induce_cardiac_arrest",
"apply_pericardial_effusion",
}
@dataclass
class ToolExecution:
"""Outcome of a handled tool call."""
state: PatientState
tool_result: ToolResult
error: ToolError | None
class PulseToolExecutor:
"""Executes named tools against a PulseEngineAdapter."""
def __init__(self, adapter: PulseEngineAdapter) -> None:
self._adapter = adapter
self._handlers: dict[str, Callable[[dict[str, Any]], ToolExecution]] = {
"get_vitals": self._handle_get_vitals,
"get_hemodynamics": self._handle_get_hemodynamics,
"get_blood_chemistry": self._handle_get_blood_chemistry,
"get_respiratory_state": partial(self._handle_get_respiratory_status, tool_name="get_respiratory_state"),
"get_respiratory_status": partial(self._handle_get_respiratory_status, tool_name="get_respiratory_status"),
"get_shock_assessment": self._handle_get_shock_assessment,
"run_triage_assessment": self._handle_run_triage_assessment,
"detect_deterioration": partial(self._handle_check_deterioration, tool_name="detect_deterioration"),
"check_deterioration": partial(self._handle_check_deterioration, tool_name="check_deterioration"),
"advance_time": self._handle_advance_time,
"give_oxygen": self._handle_give_oxygen,
"apply_nasal_cannula": partial(self._handle_fixed_oxygen, tool_name="apply_nasal_cannula", device="nasal_cannula"),
"apply_simple_mask": partial(self._handle_fixed_oxygen, tool_name="apply_simple_mask", device="simple_mask"),
"apply_nonrebreather_mask": partial(self._handle_fixed_oxygen, tool_name="apply_nonrebreather_mask", device="non_rebreather_mask"),
"give_fluids": self._handle_give_fluids,
"administer_crystalloid_bolus": self._handle_administer_crystalloid_bolus,
"administer_blood_transfusion": self._handle_administer_blood_transfusion,
"administer_plasma": self._handle_administer_plasma,
"activate_massive_transfusion_protocol": self._handle_activate_massive_transfusion_protocol,
"control_bleeding": self._handle_control_bleeding,
"apply_tourniquet": self._handle_apply_tourniquet,
"apply_wound_packing": self._handle_apply_wound_packing,
"apply_direct_pressure": self._handle_apply_direct_pressure,
"initiate_hemorrhage": self._handle_initiate_hemorrhage,
"position_patient": self._handle_position_patient,
"airway_support": self._handle_airway_support,
"apply_bag_valve_mask": self._handle_apply_bag_valve_mask,
"perform_intubation": self._handle_perform_intubation,
"set_ventilator_tidal_volume": self._handle_set_ventilator_tidal_volume,
"set_ventilator_rate": self._handle_set_ventilator_rate,
"set_ventilator_fio2": self._handle_set_ventilator_fio2,
"give_pressor": self._handle_give_pressor,
"start_norepinephrine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_norepinephrine_infusion", pressor="norepinephrine"),
"start_dopamine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_dopamine_infusion", pressor="dopamine"),
"start_phenylephrine_infusion": partial(self._handle_start_pressor_infusion, tool_name="start_phenylephrine_infusion", pressor="phenylephrine"),
"adjust_infusion_rate": self._handle_adjust_infusion_rate,
"stop_infusion": self._handle_stop_infusion,
"needle_decompression": self._handle_needle_decompression,
"perform_needle_decompression": self._handle_perform_needle_decompression,
"pericardiocentesis": self._handle_pericardiocentesis,
"perform_pericardiocentesis": self._handle_perform_pericardiocentesis,
"perform_cpr": self._handle_perform_cpr,
"induce_cardiac_arrest": self._handle_induce_cardiac_arrest,
"apply_pericardial_effusion": self._handle_apply_pericardial_effusion,
"administer_epinephrine_bolus": partial(self._handle_bolus_drug, tool_name="administer_epinephrine_bolus", drug_key="epinephrine", dose_keys=("dose_mg", "dose")),
"administer_morphine_bolus": partial(self._handle_bolus_drug, tool_name="administer_morphine_bolus", drug_key="morphine", dose_keys=("dose_mg", "dose")),
"administer_ketamine_bolus": partial(self._handle_bolus_drug, tool_name="administer_ketamine_bolus", drug_key="ketamine", dose_keys=("dose_mg_per_kg", "dose")),
"administer_midazolam_bolus": partial(self._handle_bolus_drug, tool_name="administer_midazolam_bolus", drug_key="midazolam", dose_keys=("dose_mg", "dose")),
"administer_lorazepam_bolus": partial(self._handle_bolus_drug, tool_name="administer_lorazepam_bolus", drug_key="lorazepam", dose_keys=("dose_mg", "dose")),
"administer_succinylcholine_bolus": partial(self._handle_bolus_drug, tool_name="administer_succinylcholine_bolus", drug_key="succinylcholine", dose_keys=("dose_mg_per_kg", "dose")),
"administer_phenylephrine_bolus": partial(self._handle_bolus_drug, tool_name="administer_phenylephrine_bolus", drug_key="phenylephrine", dose_keys=("dose_mcg", "dose")),
"administer_atropine_bolus": partial(self._handle_bolus_drug, tool_name="administer_atropine_bolus", drug_key="atropine", dose_keys=("dose_mg", "dose")),
"summarize_state": self._handle_summarize_state,
"recommend_next_step": self._handle_recommend_next_step,
"get_blood_gas": partial(self._handle_get_blood_gas, tool_name="get_blood_gas"),
"get_cbc": partial(self._handle_get_cbc, tool_name="get_cbc"),
"get_bmp": partial(self._handle_get_bmp, tool_name="get_bmp"),
"order_arterial_blood_gas": partial(self._handle_get_blood_gas, tool_name="order_arterial_blood_gas"),
"order_complete_blood_count": partial(self._handle_get_cbc, tool_name="order_complete_blood_count"),
"order_basic_metabolic_panel": partial(self._handle_get_bmp, tool_name="order_basic_metabolic_panel"),
"order_point_of_care_ultrasound": self._handle_order_point_of_care_ultrasound,
"auscultate_chest": self._handle_auscultate_chest,
"assess_consciousness_level": self._handle_assess_consciousness_level,
"check_pain_level": self._handle_check_pain_level,
"measure_core_temperature": self._handle_measure_core_temperature,
"check_end_tidal_co2": self._handle_check_end_tidal_co2,
"calculate_shock_index": self._handle_calculate_shock_index,
"assess_urine_output": self._handle_assess_urine_output,
}
@property
def available_tools(self) -> list[str]:
return list(AVAILABLE_TOOL_NAMES)
def execute(self, action: PulsePhysiologyAction) -> ToolExecution:
"""Validate and execute a tool action."""
tool_name = canonicalize_tool_name(action.tool_name.strip(), allowed_tools=list(self._handlers))
if tool_name not in self._handlers:
return self._failure(
tool_name=tool_name,
state=self._adapter.get_full_state(),
code="UNSUPPORTED_TOOL",
message=f"Unsupported tool '{tool_name}'.",
retryable=False,
)
try:
return self._handlers[tool_name](dict(action.arguments))
except ValueError as exc:
return self._failure(
tool_name=tool_name,
state=self._adapter.get_full_state(),
code="INVALID_ARGUMENT",
message=str(exc),
retryable=False,
)
except RuntimeError as exc:
return self._failure(
tool_name=tool_name,
state=self._adapter.get_full_state(),
code="ENGINE_ERROR",
message=str(exc),
retryable=True,
)
def _handle_get_vitals(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
message = (
f"HR {self._fmt(state.heart_rate_bpm)} bpm, BP "
f"{self._fmt(state.systolic_bp_mmhg)}/{self._fmt(state.diastolic_bp_mmhg)} mmHg, "
f"MAP {self._fmt(state.mean_arterial_pressure_mmhg)} mmHg, SpO2 {self._fmt(state.spo2, precision=3)}, "
f"RR {self._fmt(state.respiration_rate_bpm)}, Temp {self._fmt(state.core_temperature_c)} C, "
f"EtCO2 {self._fmt(state.etco2_mmhg)} mmHg."
)
return self._success("get_vitals", state, message, previous_state=state)
def _handle_get_hemodynamics(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
hemodynamics = self._adapter.get_hemodynamics_summary()
message = (
f"CO {self._fmt(hemodynamics['cardiac_output_l_per_min'])} L/min, stroke volume "
f"{self._fmt(hemodynamics['stroke_volume_ml'])} mL, SVR "
f"{self._fmt(hemodynamics['systemic_vascular_resistance_mmhg_min_per_l'])} mmHg*min/L, "
f"MAP {self._fmt(hemodynamics['mean_arterial_pressure_mmhg'])} mmHg."
)
return self._success("get_hemodynamics", state, message, previous_state=state)
def _handle_get_blood_chemistry(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
chemistry = self._adapter.get_blood_chemistry_summary()
message = (
f"pH {self._fmt(chemistry['ph'], precision=3)}, lactate {self._fmt(chemistry['lactate_mg_per_dl'])} mg/dL, "
f"hemoglobin {self._fmt(chemistry['hemoglobin_g_per_dl'])} g/dL, base deficit "
f"{self._fmt(chemistry['base_deficit_meq_per_l'])} mEq/L."
)
return self._success("get_blood_chemistry", state, message, previous_state=state)
def _handle_get_respiratory_status(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution:
state = self._adapter.get_full_state()
message = (
f"Breath sounds: {state.breath_sounds}; SpO2 {self._fmt(state.spo2, precision=3)}; "
f"EtCO2 {self._fmt(state.etco2_mmhg)} mmHg; tidal volume {self._fmt(state.tidal_volume_ml)} mL; "
f"airway support: {state.airway_support or 'none'}."
)
return self._success(tool_name, state, message, previous_state=state)
def _handle_get_shock_assessment(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
shock = self._adapter.get_shock_assessment()
message = (
f"Shock index {self._fmt(shock['shock_index'], precision=2)}, class {shock['shock_class']}, "
f"perfusion {shock['perfusion_status']}, urine output {self._fmt(shock['urine_output_ml_per_hr'])} mL/hr."
)
return self._success("get_shock_assessment", state, message, previous_state=state)
def _handle_run_triage_assessment(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
shock = self._adapter.get_shock_assessment()
respiratory = "unstable" if state.spo2 is not None and state.spo2 < 0.9 else "acceptable"
if "cardiac_arrest" in state.active_alerts:
triage = "expectant"
elif state.mean_arterial_pressure_mmhg is not None and state.mean_arterial_pressure_mmhg < 55:
triage = "immediate"
elif state.spo2 is not None and state.spo2 < 0.9:
triage = "immediate"
elif state.active_hemorrhages:
triage = "urgent"
else:
triage = "delayed"
alerts = ", ".join(state.active_alerts) if state.active_alerts else "no active alerts"
message = (
f"Triage {triage}. Respiratory status {respiratory}; shock class {shock['shock_class']}; "
f"mental status {state.mental_status}; alerts {alerts}."
)
return self._success("run_triage_assessment", state, message, previous_state=state)
def _handle_check_deterioration(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution:
state = self._adapter.get_full_state()
deterioration_domains: list[str] = []
if state.spo2 is not None and state.spo2 < 0.92:
deterioration_domains.append("respiratory")
if state.shock_index is not None and state.shock_index >= 0.9:
deterioration_domains.append("circulatory")
if state.mental_status in {"pain", "unresponsive"}:
deterioration_domains.append("neurologic")
if state.lactate_trend == "worsening":
deterioration_domains.append("perfusion")
if state.pending_diagnostics:
deterioration_domains.append("diagnostics")
if not deterioration_domains:
message = "No major deterioration flags detected right now."
else:
message = f"Deterioration detected in: {', '.join(dict.fromkeys(deterioration_domains))}."
return self._success(tool_name, state, message, previous_state=state)
def _handle_advance_time(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
seconds = self._read_positive_float(arguments, keys=("seconds", "duration_s", "sim_time_s"))
if seconds <= 0:
raise ValueError("seconds must be greater than zero.")
if seconds > 1800:
raise ValueError("seconds must be 1800 or less for a single tool call.")
state = self._adapter.advance_time(seconds)
return self._success(
"advance_time",
state,
f"Advanced simulation by {seconds:.0f} seconds.",
previous_state=previous,
)
def _handle_give_oxygen(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
device = str(arguments.get("device") or self._suggest_oxygen_device(previous))
flow_lpm = self._read_optional_float(arguments, keys=("flow_lpm",))
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.apply_supplemental_oxygen(
device=device,
flow_lpm=flow_lpm,
advance_time_seconds=monitor_seconds,
)
resolved_flow = state.oxygen_flow_lpm if state.oxygen_flow_lpm is not None else flow_lpm
return self._success(
"give_oxygen",
state,
f"Supplemental oxygen applied via {state.oxygen_device or device} at {self._fmt(resolved_flow)} L/min.",
previous_state=previous,
)
def _handle_fixed_oxygen(self, arguments: dict[str, Any], *, tool_name: str, device: str) -> ToolExecution:
previous = self._adapter.get_full_state()
flow_lpm = self._read_optional_float(arguments, keys=("flow_rate_lpm", "flow_lpm")) or self._default_oxygen_flow(device)
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.apply_supplemental_oxygen(
device=device,
flow_lpm=flow_lpm,
advance_time_seconds=monitor_seconds,
)
return self._success(
tool_name,
state,
f"Applied {device.replace('_', ' ')} at {self._fmt(flow_lpm)} L/min.",
previous_state=previous,
)
def _handle_give_fluids(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
fluid_type = str(arguments.get("fluid_type") or arguments.get("fluid") or "saline").strip().lower()
volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "bag_volume_ml")) or 500.0
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 100.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
compound = self._adapter.resolve_fluid_compound(fluid_type)
state = self._adapter.infuse_compound(
compound=compound,
bag_volume_ml=volume_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"give_fluids",
state,
f"Started {compound} infusion: {volume_ml:.0f} mL at {rate_ml_per_min:.0f} mL/min.",
previous_state=previous,
)
def _handle_administer_crystalloid_bolus(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
volume_l = self._read_optional_float(arguments, keys=("volume_l",))
volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "bag_volume_ml"))
resolved_volume_ml = volume_ml if volume_ml is not None else (volume_l * 1000.0 if volume_l is not None else 1000.0)
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 250.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0
state = self._adapter.infuse_compound(
compound="Saline",
bag_volume_ml=resolved_volume_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"administer_crystalloid_bolus",
state,
f"Administered crystalloid bolus of {resolved_volume_ml:.0f} mL at {rate_ml_per_min:.0f} mL/min.",
previous_state=previous,
)
def _handle_administer_blood_transfusion(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
units = self._read_optional_float(arguments, keys=("units",)) or 1.0
if units <= 0:
raise ValueError("units must be greater than zero.")
bag_volume_ml = units * 300.0
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 150.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0
state = self._adapter.infuse_compound(
compound="PackedRBC",
bag_volume_ml=bag_volume_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"administer_blood_transfusion",
state,
f"Started packed red blood cell transfusion for {units:.1f} unit(s).",
previous_state=previous,
)
def _handle_administer_plasma(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
if not self._adapter.supports_compound("Plasma"):
return self._failure(
tool_name="administer_plasma",
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message="The local Pulse build does not expose a plasma compound infusion.",
retryable=False,
)
units = self._read_optional_float(arguments, keys=("units",)) or 1.0
if units <= 0:
raise ValueError("units must be greater than zero.")
bag_volume_ml = units * 250.0
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 125.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 120.0
state = self._adapter.infuse_compound(
compound="Plasma",
bag_volume_ml=bag_volume_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"administer_plasma",
state,
f"Started plasma transfusion for {units:.1f} unit(s).",
previous_state=previous,
)
def _handle_activate_massive_transfusion_protocol(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
if not self._adapter.supports_compound("Plasma"):
return self._failure(
tool_name="activate_massive_transfusion_protocol",
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message="Full MTP is unavailable because the local Pulse build does not expose plasma.",
retryable=False,
)
prbc_units = self._read_optional_float(arguments, keys=("prbc_units",)) or 2.0
plasma_units = self._read_optional_float(arguments, keys=("plasma_units",)) or 2.0
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",)) or 200.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 180.0
self._adapter.infuse_compound(
compound="PackedRBC",
bag_volume_ml=prbc_units * 300.0,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=0.0,
)
state = self._adapter.infuse_compound(
compound="Plasma",
bag_volume_ml=plasma_units * 250.0,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"activate_massive_transfusion_protocol",
state,
f"Activated MTP with {prbc_units:.1f} unit(s) PRBC and {plasma_units:.1f} unit(s) plasma.",
previous_state=previous,
)
def _handle_control_bleeding(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
active_site = self._resolve_bleeding_site(previous, arguments)
if active_site is None:
return self._failure(
tool_name="control_bleeding",
state=previous,
code="PRECONDITION_FAILED",
message="No active hemorrhage is currently tracked.",
retryable=False,
)
method = str(arguments.get("method") or self._default_bleeding_method(active_site)).strip().lower().replace("-", "_").replace(" ", "_")
current_flow = previous.active_hemorrhages[active_site]
if method == "tourniquet":
new_flow = 0.0
elif method == "pressure":
new_flow = current_flow * 0.35
elif method in {"hemostatic_dressing", "pressure_dressing", "wound_packing"}:
new_flow = current_flow * 0.2
else:
raise ValueError("method must be one of: tourniquet, pressure, hemostatic_dressing")
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.set_hemorrhage(
active_site,
flow_rate_ml_per_min=new_flow,
advance_time_seconds=monitor_seconds,
)
return self._success(
"control_bleeding",
state,
f"Applied {method} to {active_site.replace('_', ' ')}.",
previous_state=previous,
)
def _handle_apply_tourniquet(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
limb = self._normalize_site(arguments.get("limb") or arguments.get("site"))
if not limb:
raise ValueError("limb is required and must be one of: left_arm, right_arm, left_leg, right_leg")
if limb not in {"left_arm", "right_arm", "left_leg", "right_leg"}:
raise ValueError("limb must be one of: left_arm, right_arm, left_leg, right_leg")
if limb not in previous.active_hemorrhages:
return self._failure(
tool_name="apply_tourniquet",
state=previous,
code="PRECONDITION_FAILED",
message=f"No active limb hemorrhage is tracked at '{limb}'.",
retryable=False,
)
state = self._adapter.set_hemorrhage(limb, flow_rate_ml_per_min=0.0, advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0)
return self._success("apply_tourniquet", state, f"Tourniquet applied to {limb.replace('_', ' ')}.", previous_state=previous)
def _handle_apply_wound_packing(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
site = self._resolve_bleeding_site(previous, arguments)
if site is None:
return self._failure(
tool_name="apply_wound_packing",
state=previous,
code="PRECONDITION_FAILED",
message="No active hemorrhage is currently tracked.",
retryable=False,
)
state = self._adapter.set_hemorrhage(
site,
flow_rate_ml_per_min=previous.active_hemorrhages[site] * 0.2,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success("apply_wound_packing", state, f"Wound packing applied to {site.replace('_', ' ')}.", previous_state=previous)
def _handle_apply_direct_pressure(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
site = self._resolve_bleeding_site(previous, arguments)
if site is None:
return self._failure(
tool_name="apply_direct_pressure",
state=previous,
code="PRECONDITION_FAILED",
message="No active hemorrhage is currently tracked.",
retryable=False,
)
state = self._adapter.set_hemorrhage(
site,
flow_rate_ml_per_min=previous.active_hemorrhages[site] * 0.35,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success("apply_direct_pressure", state, f"Direct pressure applied to {site.replace('_', ' ')}.", previous_state=previous)
def _handle_initiate_hemorrhage(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
restriction = self._restricted_tool_failure("initiate_hemorrhage", previous, arguments)
if restriction is not None:
return restriction
compartment = str(arguments.get("compartment") or "").strip()
rate_ml_per_min = self._read_positive_float(arguments, keys=("rate_ml_per_min", "rate"))
hemorrhage_type = str(arguments.get("hemorrhage_type") or "external")
state = self._adapter.set_hemorrhage(
compartment,
flow_rate_ml_per_min=rate_ml_per_min,
hemorrhage_type=hemorrhage_type,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 30.0,
)
return self._success(
"initiate_hemorrhage",
state,
f"Initiated {hemorrhage_type} hemorrhage in {compartment} at {rate_ml_per_min:.0f} mL/min.",
previous_state=previous,
)
def _handle_position_patient(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
position = str(arguments.get("position") or self._suggest_position(previous))
state = self._adapter.set_patient_position(position)
return self._success(
"position_patient",
state,
f"Patient position set to {state.position.replace('_', ' ')}.",
previous_state=previous,
)
def _handle_airway_support(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
requested_support = arguments.get("support_type") or arguments.get("mode")
support_type = str(requested_support or self._suggest_airway_support(previous))
support_key = support_type.strip().lower().replace("-", "_").replace(" ", "_")
if support_key in {"auto", "basic", "default", "standard", "support", "airway_support"}:
support_type = self._suggest_airway_support(previous)
support_key = support_type.strip().lower().replace("-", "_").replace(" ", "_")
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
if support_key in {"bag_valve_mask", "bvm"}:
state = self._adapter.apply_bag_valve_mask(
fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")),
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")),
respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")),
inspiratory_expiratory_ratio=self._read_optional_float(arguments, keys=("ie_ratio", "inspiratory_expiratory_ratio")),
squeeze_pressure_cmh2o=self._read_optional_float(arguments, keys=("squeeze_pressure_cmh2o", "pressure_cmh2o")),
squeeze_volume_ml=self._read_optional_float(arguments, keys=("squeeze_volume_ml", "tidal_volume_ml")),
airway_adjunct=arguments.get("airway_adjunct"),
advance_time_seconds=monitor_seconds,
)
elif support_key == "cpap":
state = self._adapter.apply_cpap(
fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")),
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")),
pressure_support_cmh2o=self._read_optional_float(arguments, keys=("pressure_support_cmh2o", "pressure_support")),
advance_time_seconds=monitor_seconds,
)
elif support_key in {"pressure_control_ventilation", "pressure_control"}:
state = self._adapter.apply_pressure_control_ventilation(
fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")),
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")),
inspiratory_pressure_cmh2o=self._read_optional_float(arguments, keys=("inspiratory_pressure_cmh2o", "pressure_cmh2o")),
respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")),
inspiratory_period_s=self._read_optional_float(arguments, keys=("inspiratory_period_s",)),
advance_time_seconds=monitor_seconds,
)
elif support_key in {"volume_control_ventilation", "volume_control"}:
state = self._adapter.apply_volume_control_ventilation(
fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")),
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")),
tidal_volume_ml=self._read_optional_float(arguments, keys=("tidal_volume_ml", "squeeze_volume_ml")),
respiration_rate_bpm=self._read_optional_float(arguments, keys=("respiration_rate_bpm", "rate_bpm")),
inspiratory_period_s=self._read_optional_float(arguments, keys=("inspiratory_period_s",)),
advance_time_seconds=monitor_seconds,
)
else:
state = self._adapter.set_intubation(support_type, advance_time_seconds=monitor_seconds)
return self._success(
"airway_support",
state,
f"Airway support set to {state.airway_support or 'off'}.",
previous_state=previous,
)
def _handle_apply_bag_valve_mask(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
rate_bpm = self._read_optional_float(arguments, keys=("rate_bpm", "respiration_rate_bpm")) or 12.0
volume_ml = self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or 500.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.apply_bag_valve_mask(
respiration_rate_bpm=rate_bpm,
squeeze_volume_ml=volume_ml,
fio2=self._read_optional_fio2(arguments, keys=("fio2", "fraction_inspired_oxygen")),
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")),
airway_adjunct=arguments.get("airway_adjunct"),
advance_time_seconds=monitor_seconds,
)
return self._success(
"apply_bag_valve_mask",
state,
f"BVM ventilation started at {self._fmt(rate_bpm)} bpm with {self._fmt(volume_ml)} mL squeezes.",
previous_state=previous,
)
def _handle_perform_intubation(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
airway_type = str(arguments.get("airway_type") or arguments.get("kind") or "tracheal")
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.set_intubation(airway_type, advance_time_seconds=monitor_seconds)
return self._success(
"perform_intubation",
state,
f"Performed {airway_type.replace('_', ' ')} intubation.",
previous_state=previous,
)
def _handle_set_ventilator_tidal_volume(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
tidal_volume_ml = self._read_positive_float(arguments, keys=("volume_ml", "tidal_volume_ml"))
runtime = self._adapter._runtime
state = self._adapter.apply_volume_control_ventilation(
fio2=self._read_optional_fio2(arguments, keys=("fio2",)) or runtime.fio2,
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o,
tidal_volume_ml=tidal_volume_ml,
respiration_rate_bpm=self._read_optional_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm")) or runtime.ventilator_respiration_rate_bpm,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success(
"set_ventilator_tidal_volume",
state,
f"Ventilator tidal volume set to {self._fmt(tidal_volume_ml)} mL.",
previous_state=previous,
)
def _handle_set_ventilator_rate(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
breaths_per_min = self._read_positive_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm"))
runtime = self._adapter._runtime
state = self._adapter.apply_volume_control_ventilation(
fio2=self._read_optional_fio2(arguments, keys=("fio2",)) or runtime.fio2,
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o,
tidal_volume_ml=self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or runtime.ventilator_tidal_volume_ml,
respiration_rate_bpm=breaths_per_min,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success(
"set_ventilator_rate",
state,
f"Ventilator rate set to {self._fmt(breaths_per_min)} breaths/min.",
previous_state=previous,
)
def _handle_set_ventilator_fio2(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
fio2 = self._read_positive_fio2(arguments, keys=("fraction", "fio2", "fraction_inspired_oxygen"))
if fio2 > 1.0:
raise ValueError("fio2 must be between 0.21 and 1.0.")
runtime = self._adapter._runtime
state = self._adapter.apply_volume_control_ventilation(
fio2=fio2,
peep_cmh2o=self._read_optional_float(arguments, keys=("peep_cmh2o", "peep")) or runtime.peep_cmh2o,
tidal_volume_ml=self._read_optional_float(arguments, keys=("volume_ml", "tidal_volume_ml")) or runtime.ventilator_tidal_volume_ml,
respiration_rate_bpm=self._read_optional_float(arguments, keys=("breaths_per_min", "respiration_rate_bpm", "rate_bpm")) or runtime.ventilator_respiration_rate_bpm,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success(
"set_ventilator_fio2",
state,
f"Ventilator FiO2 set to {fio2:.2f}.",
previous_state=previous,
)
def _handle_give_pressor(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
pressor = str(arguments.get("pressor") or arguments.get("agent") or "norepinephrine")
rate_ml_per_min = self._read_optional_float(arguments, keys=("rate_ml_per_min",))
concentration_ug_per_ml = self._read_optional_float(arguments, keys=("concentration_ug_per_ml",))
if self._read_optional_bool(arguments, keys=("stop",)) is True:
rate_ml_per_min = 0.0
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.set_pressor(
pressor=pressor,
concentration_ug_per_ml=concentration_ug_per_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
if rate_ml_per_min == 0.0:
message = f"Stopped {pressor} infusion."
else:
pressor_key = pressor.strip().lower().replace("-", "_").replace(" ", "_")
active_rate = state.active_infusions.get(pressor_key)
rate_text = self._fmt(active_rate) if active_rate is not None else self._fmt(rate_ml_per_min)
concentration_text = (
self._fmt(concentration_ug_per_ml)
if concentration_ug_per_ml is not None
else "default"
)
message = (
f"Started {pressor} at {rate_text} mL/min with concentration "
f"{concentration_text} ug/mL."
)
return self._success("give_pressor", state, message, previous_state=previous)
def _handle_start_pressor_infusion(self, arguments: dict[str, Any], *, tool_name: str, pressor: str) -> ToolExecution:
previous = self._adapter.get_full_state()
if pressor not in PulseEngineAdapter._PRESSOR_CONFIG:
return self._failure(
tool_name=tool_name,
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message=f"The local Pulse build does not expose {pressor}.",
retryable=False,
)
dose = self._read_positive_float(arguments, keys=("dose_mcg_per_kg_per_min", "dose"))
weight_kg = self._adapter.get_patient_weight_kg()
if weight_kg is None or weight_kg <= 0:
raise RuntimeError("Patient weight is unavailable for mcg/kg/min infusion dosing.")
concentration_ug_per_ml = (
self._read_optional_float(arguments, keys=("concentration_ug_per_ml",))
or PulseEngineAdapter._PRESSOR_CONFIG[pressor]["default_concentration_ug_per_ml"]
)
rate_ml_per_min = (dose * weight_kg) / concentration_ug_per_ml
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.set_pressor(
pressor=pressor,
concentration_ug_per_ml=concentration_ug_per_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
tool_name,
state,
f"Started {pressor} infusion at {dose:.2f} mcg/kg/min ({rate_ml_per_min:.2f} mL/min).",
previous_state=previous,
)
def _handle_adjust_infusion_rate(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
drug_name = str(arguments.get("drug_name") or arguments.get("pressor") or "").strip().lower().replace("-", "_").replace(" ", "_")
if not drug_name:
raise ValueError("drug_name is required.")
if drug_name not in PulseEngineAdapter._PRESSOR_CONFIG:
return self._failure(
tool_name="adjust_infusion_rate",
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message=f"Only pressor infusions are adjustable right now. Unsupported infusion '{drug_name}'.",
retryable=False,
)
new_dose = self._read_positive_float(arguments, keys=("new_dose", "dose_mcg_per_kg_per_min", "dose"))
weight_kg = self._adapter.get_patient_weight_kg()
if weight_kg is None or weight_kg <= 0:
raise RuntimeError("Patient weight is unavailable for mcg/kg/min infusion dosing.")
concentration_ug_per_ml = (
self._read_optional_float(arguments, keys=("concentration_ug_per_ml",))
or PulseEngineAdapter._PRESSOR_CONFIG[drug_name]["default_concentration_ug_per_ml"]
)
rate_ml_per_min = (new_dose * weight_kg) / concentration_ug_per_ml
state = self._adapter.set_pressor(
pressor=drug_name,
concentration_ug_per_ml=concentration_ug_per_ml,
rate_ml_per_min=rate_ml_per_min,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success(
"adjust_infusion_rate",
state,
f"Adjusted {drug_name} to {new_dose:.2f} mcg/kg/min ({rate_ml_per_min:.2f} mL/min).",
previous_state=previous,
)
def _handle_stop_infusion(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
drug_name = str(arguments.get("drug_name") or arguments.get("pressor") or "").strip().lower().replace("-", "_").replace(" ", "_")
if not drug_name:
raise ValueError("drug_name is required.")
if drug_name not in PulseEngineAdapter._PRESSOR_CONFIG:
return self._failure(
tool_name="stop_infusion",
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message=f"Only pressor infusions are stoppable right now. Unsupported infusion '{drug_name}'.",
retryable=False,
)
state = self._adapter.set_pressor(
pressor=drug_name,
concentration_ug_per_ml=PulseEngineAdapter._PRESSOR_CONFIG[drug_name]["default_concentration_ug_per_ml"],
rate_ml_per_min=0.0,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success("stop_infusion", state, f"Stopped {drug_name} infusion.", previous_state=previous)
def _handle_needle_decompression(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
side = str(arguments.get("side") or self._suggest_needle_side(previous))
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 90.0
state = self._adapter.apply_needle_decompression(side, advance_time_seconds=monitor_seconds)
return self._success(
"needle_decompression",
state,
f"Needle decompression performed on the {side.lower()} chest.",
previous_state=previous,
)
def _handle_perform_needle_decompression(self, arguments: dict[str, Any]) -> ToolExecution:
result = self._handle_needle_decompression(arguments)
result.tool_result.tool_name = "perform_needle_decompression"
return result
def _handle_pericardiocentesis(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
drain_rate_ml_per_min = (
self._read_optional_float(arguments, keys=("drain_rate_ml_per_min", "rate_ml_per_min")) or 150.0
)
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 180.0
state = self._adapter.perform_pericardiocentesis(
drain_rate_ml_per_min=drain_rate_ml_per_min,
advance_time_seconds=monitor_seconds,
)
return self._success(
"pericardiocentesis",
state,
f"Pericardiocentesis performed with drainage at {self._fmt(drain_rate_ml_per_min)} mL/min.",
previous_state=previous,
)
def _handle_perform_pericardiocentesis(self, arguments: dict[str, Any]) -> ToolExecution:
result = self._handle_pericardiocentesis(arguments)
result.tool_result.tool_name = "perform_pericardiocentesis"
return result
def _handle_perform_cpr(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
rate = self._read_optional_float(arguments, keys=("compression_rate_bpm", "rate_bpm")) or PulseEngineAdapter.DEFAULT_CPR_RATE_BPM
depth_cm = self._read_optional_float(arguments, keys=("depth_cm",)) or PulseEngineAdapter.DEFAULT_CPR_DEPTH_CM
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0
state = self._adapter.perform_cpr(
compression_rate_bpm=rate,
depth_cm=depth_cm,
advance_time_seconds=monitor_seconds,
)
return self._success(
"perform_cpr",
state,
f"Performed CPR at {self._fmt(rate)} compressions/min with {self._fmt(depth_cm)} cm depth.",
previous_state=previous,
)
def _handle_induce_cardiac_arrest(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
restriction = self._restricted_tool_failure("induce_cardiac_arrest", previous, arguments)
if restriction is not None:
return restriction
state = self._adapter.induce_cardiac_arrest(
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 30.0,
)
return self._success("induce_cardiac_arrest", state, "Induced cardiac arrest for scenario-authoring.", previous_state=previous)
def _handle_apply_pericardial_effusion(self, arguments: dict[str, Any]) -> ToolExecution:
previous = self._adapter.get_full_state()
restriction = self._restricted_tool_failure("apply_pericardial_effusion", previous, arguments)
if restriction is not None:
return restriction
severity = self._read_positive_float(arguments, keys=("severity",))
if severity > 1.0:
raise ValueError("severity must be between 0.0 and 1.0.")
effusion_rate_ml_per_min = severity * 150.0
state = self._adapter.set_pericardial_effusion(
effusion_rate_ml_per_min=effusion_rate_ml_per_min,
advance_time_seconds=self._read_optional_float(arguments, keys=("monitor_seconds",)) or 60.0,
)
return self._success(
"apply_pericardial_effusion",
state,
f"Applied pericardial effusion with severity {severity:.2f}.",
previous_state=previous,
)
def _handle_bolus_drug(
self,
arguments: dict[str, Any],
*,
tool_name: str,
drug_key: str,
dose_keys: tuple[str, ...],
) -> ToolExecution:
previous = self._adapter.get_full_state()
config = PulseEngineAdapter._BOLUS_DRUG_CONFIG.get(drug_key)
if config is None:
return self._failure(
tool_name=tool_name,
state=previous,
code="UNSUPPORTED_BY_ENGINE",
message=f"The local Pulse build does not expose {drug_key}.",
retryable=False,
)
dose_input = self._read_positive_float(arguments, keys=dose_keys)
total_dose = self._adapter.compute_drug_dose(dose=dose_input, dose_unit=config["dose_argument"])
monitor_seconds = self._read_optional_float(arguments, keys=("monitor_seconds",)) or config["duration_s"]
state = self._adapter.administer_substance_bolus(
drug_name=config["substance"],
dose=total_dose,
dose_unit=config["dose_argument"],
concentration_value=config["concentration_value"],
concentration_unit=config["concentration_unit"],
duration_s=config["duration_s"],
advance_time_seconds=monitor_seconds,
)
return self._success(
tool_name,
state,
f"Administered {config['substance']} bolus with input dose {dose_input:.2f} {config['dose_argument'].replace('_', '/').replace('mg', 'mg').replace('mcg', 'mcg')}.",
previous_state=previous,
)
def _handle_summarize_state(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
alerts = ", ".join(state.active_alerts) if state.active_alerts else "no active alerts"
pending = ", ".join(
f"{name}:{seconds}s" for name, seconds in sorted(state.pending_diagnostics.items())
) or "none"
ready = ", ".join(state.ready_diagnostics) if state.ready_diagnostics else "none"
message = (
f"{state.scenario_id} ({state.scenario_difficulty}): HR {self._fmt(state.heart_rate_bpm)}, "
f"MAP {self._fmt(state.mean_arterial_pressure_mmhg)}, SpO2 {self._fmt(state.spo2, precision=3)}, "
f"mental status {state.mental_status}, alerts {alerts}, pending diagnostics {pending}, ready diagnostics {ready}."
)
return self._success("summarize_state", state, message, previous_state=state)
def _handle_recommend_next_step(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
recommendation = self._recommend_next_tool(state)
return self._success(
"recommend_next_step",
state,
f"Recommended next step: {recommendation}.",
previous_state=state,
)
def _handle_get_blood_gas(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution:
return self._handle_delayed_diagnostic(
tool_name=tool_name,
diagnostic_key="order_arterial_blood_gas",
label="ABG",
render_result=lambda state: (
f"ABG pH {self._fmt(state.abg_result.ph, precision=3)}, PaO2 "
f"{self._fmt(state.abg_result.partial_pressure_of_oxygen_mmhg)} mmHg, PaCO2 "
f"{self._fmt(state.abg_result.partial_pressure_of_carbon_dioxide_mmhg)} mmHg, lactate "
f"{self._fmt(state.abg_result.lactate_mg_per_dl)} mg/dL."
),
)
def _handle_get_cbc(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution:
return self._handle_delayed_diagnostic(
tool_name=tool_name,
diagnostic_key="order_complete_blood_count",
label="CBC",
render_result=lambda state: (
f"CBC hemoglobin {self._fmt(state.cbc_result.hemoglobin_g_per_dl)} g/dL, hematocrit "
f"{self._fmt(state.cbc_result.hematocrit_fraction, precision=3)}, WBC "
f"{self._fmt(state.cbc_result.white_blood_cell_count_per_u_l)} /uL."
),
)
def _handle_get_bmp(self, _arguments: dict[str, Any], *, tool_name: str) -> ToolExecution:
return self._handle_delayed_diagnostic(
tool_name=tool_name,
diagnostic_key="order_basic_metabolic_panel",
label="BMP",
render_result=lambda state: (
f"BMP sodium {self._fmt(state.bmp_result.sodium_mmol_per_l)} mmol/L, potassium "
f"{self._fmt(state.bmp_result.potassium_mmol_per_l)} mmol/L, creatinine "
f"{self._fmt(state.bmp_result.creatinine_mg_per_dl)} mg/dL, glucose "
f"{self._fmt(state.bmp_result.glucose_mg_per_dl)} mg/dL."
),
)
def _handle_order_point_of_care_ultrasound(self, arguments: dict[str, Any]) -> ToolExecution:
region = str(arguments.get("region") or "cardiac").strip().lower().replace("-", "_").replace(" ", "_")
diagnostic_key = f"order_point_of_care_ultrasound:{region}"
return self._handle_delayed_diagnostic(
tool_name="order_point_of_care_ultrasound",
diagnostic_key=diagnostic_key,
label=f"POCUS ({region})",
delay_seconds=PulseEngineAdapter.DIAGNOSTIC_DELAYS_S["order_point_of_care_ultrasound"],
render_result=lambda _state, region=region: self._adapter.get_ultrasound_summary(region),
)
def _handle_auscultate_chest(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
if state.breath_sounds == "present bilateral":
message = "Auscultation: bilateral breath sounds present."
else:
message = f"Auscultation: breath sounds are {state.breath_sounds}."
return self._success("auscultate_chest", state, message, previous_state=state)
def _handle_assess_consciousness_level(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
consciousness = self._adapter.get_consciousness_summary()
message = (
f"Consciousness level {consciousness['mental_status']} with approximate GCS "
f"{self._fmt(float(consciousness['gcs_equivalent']))}."
)
return self._success("assess_consciousness_level", state, message, previous_state=state)
def _handle_check_pain_level(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
pain_score = self._adapter.get_pain_score_0_to_10()
message = f"Estimated pain score {pain_score:.1f}/10."
return self._success("check_pain_level", state, message, previous_state=state)
def _handle_measure_core_temperature(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
message = f"Core temperature {self._fmt(state.core_temperature_c)} C."
return self._success("measure_core_temperature", state, message, previous_state=state)
def _handle_check_end_tidal_co2(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
message = f"End tidal CO2 {self._fmt(state.etco2_mmhg)} mmHg."
return self._success("check_end_tidal_co2", state, message, previous_state=state)
def _handle_calculate_shock_index(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
shock = self._adapter.get_shock_assessment()
message = f"Shock index {self._fmt(shock['shock_index'], precision=2)} with class {shock['shock_class']}."
return self._success("calculate_shock_index", state, message, previous_state=state)
def _handle_assess_urine_output(self, _arguments: dict[str, Any]) -> ToolExecution:
state = self._adapter.get_full_state()
shock = self._adapter.get_shock_assessment()
message = f"Estimated urine output {self._fmt(shock['urine_output_ml_per_hr'])} mL/hr."
return self._success("assess_urine_output", state, message, previous_state=state)
def _recommend_next_tool(self, state: PatientState) -> str:
if "possible_tension_pneumothorax" in state.active_alerts or "unilateral_absent_breath_sounds" in state.active_alerts:
return "perform_needle_decompression"
if "possible_cardiac_tamponade" in state.active_alerts:
return "perform_pericardiocentesis"
if state.active_hemorrhages:
limb_sites = {"left_arm", "right_arm", "left_leg", "right_leg"}
if any(site in limb_sites for site in state.active_hemorrhages):
return "apply_tourniquet"
return "apply_direct_pressure"
if state.spo2 is not None and state.spo2 < 0.88 and state.airway_support not in {
"bag_valve_mask",
"pressure_control_ventilation",
"volume_control_ventilation",
}:
return "apply_bag_valve_mask"
if state.spo2 is not None and state.spo2 < 0.92:
return "apply_nonrebreather_mask"
if state.mean_arterial_pressure_mmhg is not None and state.mean_arterial_pressure_mmhg < 65:
if any(key in state.active_infusions for key in {"blood", "packed_rbc", "saline"}):
return "start_norepinephrine_infusion"
return "administer_crystalloid_bolus"
if any(key.startswith("order_arterial_blood_gas") for key in state.ready_diagnostics):
return "order_arterial_blood_gas"
if state.mental_status in {"pain", "unresponsive"} and state.airway_support is None:
return "perform_intubation"
return "get_vitals"
def _handle_delayed_diagnostic(
self,
*,
tool_name: str,
diagnostic_key: str,
label: str,
render_result: Callable[[PatientState], str],
delay_seconds: int | None = None,
) -> ToolExecution:
previous = self._adapter.get_full_state()
if diagnostic_key in previous.ready_diagnostics:
return self._success(tool_name, previous, render_result(previous), previous_state=previous)
if diagnostic_key in previous.pending_diagnostics:
remaining = previous.pending_diagnostics[diagnostic_key]
return self._success(
tool_name,
previous,
f"{label} is pending. {remaining} simulated seconds remaining before results are ready.",
previous_state=previous,
)
state = self._adapter.schedule_diagnostic(
diagnostic_key,
delay_seconds=delay_seconds or self._default_diagnostic_delay(diagnostic_key),
)
remaining = state.pending_diagnostics.get(diagnostic_key)
return self._success(
tool_name,
state,
f"Ordered {label}. Results will be ready after about {remaining} simulated seconds.",
previous_state=previous,
)
def _restricted_tool_failure(
self,
tool_name: str,
state: PatientState,
arguments: dict[str, Any],
) -> ToolExecution | None:
if self._read_optional_bool(arguments, keys=("admin_override",)) is True:
return None
return self._failure(
tool_name=tool_name,
state=state,
code="RESTRICTED_TOOL",
message=f"{tool_name} is restricted to scenario-authoring workflows. Pass admin_override=true to use it deliberately.",
retryable=False,
)
def _resolve_bleeding_site(self, state: PatientState, arguments: dict[str, Any]) -> str | None:
if not state.active_hemorrhages:
return None
site = self._normalize_site(arguments.get("site") or arguments.get("compartment"))
if site:
if site not in state.active_hemorrhages:
raise ValueError(f"'{site}' is not an active hemorrhage site.")
return site
if len(state.active_hemorrhages) == 1:
return next(iter(state.active_hemorrhages))
raise ValueError("Multiple active hemorrhages are present; specify a site.")
@staticmethod
def _normalize_site(value: object) -> str:
if value is None:
return ""
return str(value).strip().lower().replace("-", "_").replace(" ", "_")
@staticmethod
def _suggest_oxygen_device(state: PatientState) -> str:
if state.spo2 is not None and state.spo2 < 0.9:
return "non_rebreather_mask"
if state.spo2 is not None and state.spo2 < 0.95:
return "simple_mask"
return "nasal_cannula"
@staticmethod
def _default_oxygen_flow(device: str) -> float:
return {
"nasal_cannula": 4.0,
"simple_mask": 8.0,
"non_rebreather_mask": 15.0,
}[device]
@staticmethod
def _default_bleeding_method(site: str) -> str:
if site in {"left_arm", "right_arm", "left_leg", "right_leg"}:
return "tourniquet"
return "pressure"
@staticmethod
def _suggest_position(state: PatientState) -> str:
if state.spo2 is not None and state.spo2 < 0.92:
return "upright"
return "supine"
@staticmethod
def _suggest_airway_support(state: PatientState) -> str:
if state.spo2 is not None and state.spo2 < 0.85:
return "pressure_control_ventilation" if state.mental_status in {"pain", "unresponsive"} else "bag_valve_mask"
if state.spo2 is not None and state.spo2 < 0.9:
return "cpap" if state.mental_status in {"alert", "verbal"} else "bag_valve_mask"
if state.mental_status in {"pain", "unresponsive"}:
return "tracheal"
if state.mental_status == "verbal":
return "oropharyngeal"
return "nasopharyngeal"
@staticmethod
def _suggest_needle_side(state: PatientState) -> str:
if "absent left" in state.breath_sounds:
return "left"
if "absent right" in state.breath_sounds:
return "right"
return "left"
@staticmethod
def _default_diagnostic_delay(diagnostic_key: str) -> int:
base_key = diagnostic_key.split(":", 1)[0]
return PulseEngineAdapter.DIAGNOSTIC_DELAYS_S.get(base_key, 120)
@staticmethod
def _read_positive_float(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float:
value = PulseToolExecutor._read_optional_float(arguments, keys=keys)
if value is None:
joined = ", ".join(keys)
raise ValueError(f"One of {joined} is required.")
return value
@staticmethod
def _read_positive_fio2(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float:
value = PulseToolExecutor._read_optional_fio2(arguments, keys=keys)
if value is None:
joined = ", ".join(keys)
raise ValueError(f"One of {joined} is required.")
return value
@staticmethod
def _read_optional_float(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float | None:
for key in keys:
if key in arguments and arguments[key] is not None:
return coerce_numeric_argument(arguments[key])
return None
@staticmethod
def _read_optional_fio2(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> float | None:
value = PulseToolExecutor._read_optional_float(arguments, keys=keys)
if value is None:
return None
if 1.0 < value <= 100.0:
return value / 100.0
return value
@staticmethod
def _read_optional_bool(arguments: dict[str, Any], *, keys: tuple[str, ...]) -> bool | None:
for key in keys:
if key in arguments and arguments[key] is not None:
return coerce_boolean_argument(arguments[key])
return None
@staticmethod
def _fmt(value: float | None, *, precision: int = 1) -> str:
if value is None:
return "n/a"
return f"{value:.{precision}f}"
@staticmethod
def _changed_fields(previous_state: PatientState, current_state: PatientState) -> list[str]:
previous_dump = previous_state.model_dump()
current_dump = current_state.model_dump()
return [
field_name
for field_name in current_dump
if previous_dump.get(field_name) != current_dump.get(field_name)
]
def _success(
self,
tool_name: str,
state: PatientState,
message: str,
*,
previous_state: PatientState,
) -> ToolExecution:
changed_fields = self._changed_fields(previous_state, state)
return ToolExecution(
state=state,
tool_result=ToolResult(
tool_name=tool_name,
success=True,
message=message,
state_changed=bool(changed_fields),
changed_fields=changed_fields,
),
error=None,
)
def _failure(
self,
*,
tool_name: str,
state: PatientState,
code: str,
message: str,
retryable: bool,
) -> ToolExecution:
error = ToolError(code=code, message=message, retryable=retryable)
return ToolExecution(
state=state,
tool_result=ToolResult(
tool_name=tool_name,
success=False,
message=message,
state_changed=False,
changed_fields=[],
),
error=error,
)