Spaces:
Running
Running
from fastapi import APIRouter, HTTPException, WebSocket | |
from .core import RoboticsCore | |
from .models import ( | |
CreateRoomRequest, | |
JointCommand, | |
ParticipantRole, | |
) | |
# ============= SIMPLIFIED API ============= | |
robotics_router = APIRouter(prefix="/robotics", tags=["robotics"]) | |
robotics_core = RoboticsCore() | |
# ============= CONVENIENT ROOM ENDPOINTS ============= | |
# Legacy endpoint for backward compatibility - creates random workspace | |
async def list_rooms_legacy(): | |
"""List all robotics rooms (legacy - requires workspace)""" | |
raise HTTPException( | |
status_code=400, | |
detail="Workspace ID required. Use /workspaces/{workspace_id}/rooms instead", | |
) | |
async def list_rooms(workspace_id: str): | |
"""List all robotics rooms in a workspace""" | |
return { | |
"success": True, | |
"workspace_id": workspace_id, | |
"rooms": robotics_core.list_rooms(workspace_id), | |
"total": len(robotics_core.list_rooms(workspace_id)), | |
} | |
# Legacy endpoint for backward compatibility - creates random workspace | |
async def create_room_legacy(request: CreateRoomRequest | None = None): | |
"""Create a new robotics room (legacy - creates random workspace)""" | |
workspace_id = None | |
room_id = None | |
if request: | |
workspace_id = request.workspace_id | |
room_id = request.room_id | |
workspace_id, room_id = robotics_core.create_room(workspace_id, room_id) | |
return { | |
"success": True, | |
"workspace_id": workspace_id, | |
"room_id": room_id, | |
"message": f"Room {room_id} created successfully in workspace {workspace_id}", | |
} | |
async def create_room(workspace_id: str, request: CreateRoomRequest | None = None): | |
"""Create a new robotics room in a workspace""" | |
room_id = None | |
if request and hasattr(request, "room_id"): | |
room_id = request.room_id | |
actual_workspace_id, room_id = robotics_core.create_room(workspace_id, room_id) | |
return { | |
"success": True, | |
"workspace_id": actual_workspace_id, | |
"room_id": room_id, | |
"message": f"Room {room_id} created successfully in workspace {actual_workspace_id}", | |
} | |
async def get_room(workspace_id: str, room_id: str): | |
"""Get room details""" | |
room_info = robotics_core.get_room_info(workspace_id, room_id) | |
if "error" in room_info: | |
raise HTTPException(status_code=404, detail="Room not found") | |
return {"success": True, "room": room_info} | |
async def delete_room(workspace_id: str, room_id: str): | |
"""Delete a robotics room""" | |
if robotics_core.delete_room(workspace_id, room_id): | |
return { | |
"success": True, | |
"message": f"Room {room_id} deleted successfully from workspace {workspace_id}", | |
} | |
raise HTTPException(status_code=404, detail="Room not found") | |
async def get_room_state(workspace_id: str, room_id: str): | |
"""Get current room state with joints and participants""" | |
state = robotics_core.get_room_state(workspace_id, room_id) | |
if "error" in state: | |
raise HTTPException(status_code=404, detail="Room not found") | |
return {"success": True, "state": state} | |
# ============= CONTROL ENDPOINTS ============= | |
async def send_command(workspace_id: str, room_id: str, command: JointCommand): | |
"""Send joint commands to a room""" | |
room_info = robotics_core.get_room_info(workspace_id, room_id) | |
if "error" in room_info: | |
raise HTTPException(status_code=404, detail="Room not found") | |
try: | |
joints_data = [ | |
{"name": j.name, "value": j.value, "speed": j.speed} for j in command.joints | |
] | |
changed = await robotics_core.send_command_to_room( | |
workspace_id, room_id, joints_data | |
) | |
except ValueError as e: | |
raise HTTPException(status_code=404) from e | |
else: | |
return { | |
"success": True, | |
"workspace_id": workspace_id, | |
"room_id": room_id, | |
"joints_updated": changed, | |
"message": "Commands sent successfully", | |
} | |
# ============= UTILITY ENDPOINTS ============= | |
async def get_status(): | |
"""Get system status""" | |
stats = robotics_core.get_connection_stats() | |
return { | |
"service": "robotics", | |
"status": "active", | |
"workspaces_count": stats["total_workspaces"], | |
"rooms_count": stats["total_rooms"], | |
"connections_count": stats["total_connections"], | |
"version": "2.0.0", | |
"supported_roles": [role.value for role in ParticipantRole], | |
"supported_robot_types": ["so-arm100", "generic"], | |
} | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "service": "robotics"} | |
# ============= WEBSOCKET ENDPOINT ============= | |
async def websocket_endpoint(websocket: WebSocket, workspace_id: str, room_id: str): | |
"""WebSocket connection for real-time room communication""" | |
await robotics_core.handle_websocket(websocket, workspace_id, room_id) | |