blanchon's picture
Initial commit
02eac4b
import logging
from enum import Enum
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# ============= ENUMS =============
class ParticipantRole(str, Enum):
"""Participant roles in a robotics room"""
PRODUCER = "producer" # Controller/master
CONSUMER = "consumer" # Robot/visualizer
class MessageType(str, Enum):
"""WebSocket message types organized by category"""
# === CONNECTION & LIFECYCLE ===
JOINED = "joined" # Confirmation of successful room join
ERROR = "error" # Error notifications
# === CONNECTION HEALTH ===
HEARTBEAT = "heartbeat" # Client ping for connection health
HEARTBEAT_ACK = "heartbeat_ack" # Server response to heartbeat
# === ROBOT CONTROL (Core) ===
JOINT_UPDATE = "joint_update" # Producer β†’ Consumers: Joint position commands
STATE_SYNC = "state_sync" # Server β†’ Consumer: Initial state synchronization
# === EMERGENCY & SAFETY ===
EMERGENCY_STOP = "emergency_stop" # Emergency stop command (any direction)
# === STATUS & MONITORING ===
STATUS_UPDATE = "status_update" # General status updates
CONSUMER_STATUS = "consumer_status" # Specific consumer status reports
class RobotType(str, Enum):
"""Supported robot types"""
SO_ARM_100 = "so-arm100"
GENERIC = "generic"
# ============= CORE MODELS =============
class CreateRoomRequest(BaseModel):
"""Request to create a new robotics room"""
room_id: str | None = None
workspace_id: str | None = None # Optional - will be generated if not provided
name: str | None = None
robot_type: RobotType = RobotType.SO_ARM_100
max_consumers: int = Field(default=10, ge=1, le=100)
class JointUpdate(BaseModel):
"""Single joint position update"""
name: str = Field(..., min_length=1, max_length=50)
value: float = Field(..., ge=-360.0, le=360.0) # Degrees
speed: float | None = Field(None, ge=0.0, le=100.0) # Percentage
class JointCommand(BaseModel):
"""Command containing multiple joint updates"""
joints: list[JointUpdate] = Field(..., min_length=1, max_length=20)
class EmergencyStop(BaseModel):
"""Emergency stop command"""
enabled: bool = True
# ============= RESPONSE MODELS =============
class ParticipantInfo(BaseModel):
"""Information about room participants"""
producer: str | None
consumers: list[str]
total: int
class RoomInfo(BaseModel):
"""Basic room information"""
id: str
workspace_id: str
participants: ParticipantInfo
joints_count: int
has_producer: bool
active_consumers: int
class RoomState(BaseModel):
"""Detailed room state"""
room_id: str
workspace_id: str
joints: dict[str, float]
participants: ParticipantInfo
timestamp: str
# ============= WEBSOCKET MODELS =============
class WebSocketMessage(BaseModel):
"""Base WebSocket message"""
type: MessageType
timestamp: str | None = None
class JoinMessage(BaseModel):
"""Message to join a room"""
participant_id: str = Field(..., min_length=1, max_length=100)
role: ParticipantRole
# === CONNECTION & LIFECYCLE MESSAGES ===
class JoinedMessage(WebSocketMessage):
"""Confirmation of successful room join"""
type: MessageType = MessageType.JOINED
room_id: str
role: ParticipantRole
class ErrorMessage(WebSocketMessage):
"""Error notification message"""
type: MessageType = MessageType.ERROR
message: str
code: str | None = None
# === CONNECTION HEALTH MESSAGES ===
class HeartbeatMessage(WebSocketMessage):
"""Heartbeat ping from client"""
type: MessageType = MessageType.HEARTBEAT
class HeartbeatAckMessage(WebSocketMessage):
"""Heartbeat acknowledgment from server"""
type: MessageType = MessageType.HEARTBEAT_ACK
# === ROBOT CONTROL MESSAGES ===
class JointUpdateMessage(WebSocketMessage):
"""Joint position update command (Producer β†’ Consumers)"""
type: MessageType = MessageType.JOINT_UPDATE
data: list[JointUpdate]
source: str | None = None # "api" or participant_id
class StateSyncMessage(WebSocketMessage):
"""Initial state synchronization (Server β†’ Consumer)"""
type: MessageType = MessageType.STATE_SYNC
data: dict[str, float] # Current joint positions
# === EMERGENCY & SAFETY MESSAGES ===
class EmergencyStopMessage(WebSocketMessage):
"""Emergency stop command"""
type: MessageType = MessageType.EMERGENCY_STOP
enabled: bool = True
reason: str | None = None
# === STATUS & MONITORING MESSAGES ===
class StatusUpdateMessage(WebSocketMessage):
"""General status update"""
type: MessageType = MessageType.STATUS_UPDATE
status: str
data: dict | None = None
class ConsumerStatusMessage(WebSocketMessage):
"""Consumer-specific status report"""
type: MessageType = MessageType.CONSUMER_STATUS
consumer_id: str
status: str # "ready", "busy", "error", "offline"
joint_states: list[dict] | None = None