SalexAI commited on
Commit
ece12b4
·
verified ·
1 Parent(s): 2117a18

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +47 -278
app.py CHANGED
@@ -1,31 +1,16 @@
1
- import os
2
  import asyncio
3
  from uuid import uuid4
4
  from datetime import datetime, timezone
5
- from typing import Optional, Dict, Any, List
6
 
7
- from fastapi import FastAPI, HTTPException, Header, Request
8
- from fastapi.responses import JSONResponse
9
  from pydantic import BaseModel, Field
10
- from starlette.routing import Mount
11
 
12
- # MCP (official Python SDK)
13
  from mcp.server.fastmcp import FastMCP, Context
14
 
15
- # ============================================================
16
- # CONFIG
17
- # ============================================================
18
- API_TOKEN = os.getenv("ROBOT_API_TOKEN", "change-me-please")
19
- ALLOWED_ORIGINS = {
20
- # Add your domains here (HF space URL, localhost during testing, etc.)
21
- "https://your-space-name.hf.space",
22
- "http://localhost:7860",
23
- "http://127.0.0.1:7860",
24
- }
25
 
26
  # ============================================================
27
- # IN-MEMORY COMMAND STORE
28
- # (Good for MVP / HF Space demos. Use Redis later if you want persistence.)
29
  # ============================================================
30
  commands: Dict[str, Dict[str, Any]] = {}
31
  queue: List[str] = []
@@ -36,287 +21,107 @@ def now_iso() -> str:
36
  return datetime.now(timezone.utc).isoformat()
37
 
38
 
39
- def check_bearer(auth_header: Optional[str]):
40
- if not API_TOKEN:
41
- return
42
- if not auth_header or not auth_header.startswith("Bearer "):
43
- raise HTTPException(status_code=401, detail="Missing bearer token")
44
- token = auth_header.split(" ", 1)[1].strip()
45
- if token != API_TOKEN:
46
- raise HTTPException(status_code=403, detail="Invalid token")
47
-
48
-
49
- def command_expired(cmd: Dict[str, Any]) -> bool:
50
- ttl = cmd.get("ttl_seconds")
51
- if not ttl:
52
- return False
53
- created = datetime.fromisoformat(cmd["created_at"])
54
- age = (datetime.now(timezone.utc) - created).total_seconds()
55
- return age > ttl
56
-
57
-
58
- async def enqueue_command(
59
- command: str,
60
- args: Optional[Dict[str, Any]] = None,
61
- source: str = "mcp",
62
- priority: int = 0,
63
- ttl_seconds: int = 120,
64
- meta: Optional[Dict[str, Any]] = None,
65
- ) -> Dict[str, Any]:
66
  cmd_id = str(uuid4())
67
  record = {
68
  "id": cmd_id,
69
  "command": command,
70
  "args": args or {},
71
  "source": source,
72
- "priority": priority,
73
- "ttl_seconds": ttl_seconds,
74
- "status": "queued",
75
  "result": None,
76
  "error": None,
77
  "created_at": now_iso(),
78
  "updated_at": now_iso(),
79
  "claimed_by": None,
80
- "meta": meta or {},
81
  }
82
-
83
  async with queue_lock:
84
  commands[cmd_id] = record
85
-
86
- # priority insert (higher first)
87
- inserted = False
88
- for i, queued_id in enumerate(queue):
89
- q = commands.get(queued_id)
90
- if not q:
91
- continue
92
- if record["priority"] > q.get("priority", 0):
93
- queue.insert(i, cmd_id)
94
- inserted = True
95
- break
96
- if not inserted:
97
- queue.append(cmd_id)
98
-
99
  return record
100
 
101
 
102
  # ============================================================
103
- # FASTAPI MODELS (robot runner REST side)
104
- # ============================================================
105
- class CommandStatusUpdate(BaseModel):
106
- status: str = Field(..., description="queued | running | done | failed | expired")
107
- result: Optional[Dict[str, Any]] = None
108
- error: Optional[str] = None
109
- robot_id: Optional[str] = None
110
-
111
-
112
- class CommandCreateREST(BaseModel):
113
- command: str
114
- args: Dict[str, Any] = Field(default_factory=dict)
115
- source: str = "api"
116
- priority: int = 0
117
- ttl_seconds: int = 120
118
-
119
-
120
- # ============================================================
121
- # MCP SERVER (tools for Claude/ChatGPT/etc)
122
  # ============================================================
123
  mcp = FastMCP(
124
- "Create3 Robot Bridge",
125
  instructions=(
126
- "Send safe, short Create 3 robot commands by enqueueing them for a local robot runner. "
127
- "This server does not execute commands directly; it queues them for a trusted client."
128
  ),
129
- stateless_http=True,
130
- json_response=True,
131
  )
132
 
133
- # Optional: simple helper tool
134
- @mcp.tool()
135
- async def robot_status_summary(ctx: Context) -> dict:
136
- """Get queue and command status summary."""
137
- queued = sum(1 for c in commands.values() if c["status"] == "queued")
138
- running = sum(1 for c in commands.values() if c["status"] == "running")
139
- done = sum(1 for c in commands.values() if c["status"] == "done")
140
- failed = sum(1 for c in commands.values() if c["status"] == "failed")
141
- expired = sum(1 for c in commands.values() if c["status"] == "expired")
142
- return {
143
- "ok": True,
144
- "server": "Create3 Robot Bridge",
145
- "queued": queued,
146
- "running": running,
147
- "done": done,
148
- "failed": failed,
149
- "expired": expired,
150
- "total": len(commands),
151
- "time": now_iso(),
152
- }
153
-
154
-
155
- @mcp.tool()
156
- async def list_recent_commands(limit: int = 20, ctx: Optional[Context] = None) -> dict:
157
- """List recent robot commands and their status."""
158
- items = sorted(commands.values(), key=lambda x: x["updated_at"], reverse=True)[: max(1, min(limit, 100))]
159
- return {"ok": True, "items": items}
160
-
161
-
162
- @mcp.tool()
163
- async def get_command(command_id: str, ctx: Optional[Context] = None) -> dict:
164
- """Get the status of one command by ID."""
165
- cmd = commands.get(command_id)
166
- if not cmd:
167
- return {"ok": False, "error": "Command not found", "command_id": command_id}
168
- return {"ok": True, "item": cmd}
169
-
170
-
171
- # ---- Robot command tools (enqueue only) ----
172
- @mcp.tool()
173
- async def undock(priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
174
- """Undock the Create 3 from its dock."""
175
- rec = await enqueue_command("undock", {}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
176
- return {"ok": True, "queued": rec}
177
-
178
-
179
- @mcp.tool()
180
- async def dock(priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
181
- """Dock the Create 3 to its dock."""
182
- rec = await enqueue_command("dock", {}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
183
- return {"ok": True, "queued": rec}
184
-
185
-
186
- @mcp.tool()
187
- async def move_cm(cm: float, priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
188
- """Move forward (or backward if negative) in centimeters."""
189
- rec = await enqueue_command("move_cm", {"cm": cm}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
190
- return {"ok": True, "queued": rec}
191
-
192
-
193
- @mcp.tool()
194
- async def turn_left(deg: float = 90, priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
195
- """Turn the robot left by degrees."""
196
- rec = await enqueue_command("turn_left", {"deg": deg}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
197
- return {"ok": True, "queued": rec}
198
-
199
 
200
  @mcp.tool()
201
- async def turn_right(deg: float = 90, priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
202
- """Turn the robot right by degrees."""
203
- rec = await enqueue_command("turn_right", {"deg": deg}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
204
  return {"ok": True, "queued": rec}
205
 
206
 
207
  @mcp.tool()
208
- async def drive(left: float, right: float, seconds: float = 1.0, priority: int = 0, ttl_seconds: int = 120, ctx: Optional[Context] = None) -> dict:
209
- """Direct wheel drive command. Use small values for safety."""
210
- rec = await enqueue_command(
211
- "drive",
212
- {"left": left, "right": right, "seconds": seconds},
213
- source="mcp",
214
- priority=priority,
215
- ttl_seconds=ttl_seconds,
216
- )
217
  return {"ok": True, "queued": rec}
218
 
219
 
220
  @mcp.tool()
221
- async def stop(priority: int = 100, ttl_seconds: int = 30, ctx: Optional[Context] = None) -> dict:
222
- """Emergency stop. High priority."""
223
- rec = await enqueue_command("stop", {}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
224
- return {"ok": True, "queued": rec}
225
-
226
-
227
- @mcp.tool()
228
- async def say(text: str, priority: int = 0, ttl_seconds: int = 60, ctx: Optional[Context] = None) -> dict:
229
- """Queue a local console 'say' message (placeholder for TTS if you add it client-side)."""
230
- rec = await enqueue_command("say", {"text": text}, source="mcp", priority=priority, ttl_seconds=ttl_seconds)
231
  return {"ok": True, "queued": rec}
232
 
233
 
234
  # ============================================================
235
- # FASTAPI APP (REST endpoints for robot client + MCP mount)
236
  # ============================================================
237
- app = FastAPI(title="Create3 MCP + Queue Bridge", version="2.0.0")
238
-
239
 
240
- # --- Security middleware for MCP HTTP endpoint (Origin + Bearer) ---
241
- @app.middleware("http")
242
- async def mcp_security_middleware(request: Request, call_next):
243
- # MCP is mounted under /mcp (via Starlette mount below)
244
- if request.url.path.startswith("/mcp"):
245
- # Origin validation for browser-based calls (recommended by MCP transport spec)
246
- origin = request.headers.get("origin")
247
- if origin:
248
- if origin not in ALLOWED_ORIGINS:
249
- return JSONResponse(status_code=403, content={"detail": "Origin not allowed"})
250
 
251
- # Bearer token auth
252
- try:
253
- check_bearer(request.headers.get("authorization"))
254
- except HTTPException as e:
255
- return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
256
-
257
- # Also protect REST command endpoints
258
- if request.url.path.startswith("/commands"):
259
- try:
260
- check_bearer(request.headers.get("authorization"))
261
- except HTTPException as e:
262
- return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
263
-
264
- return await call_next(request)
265
 
266
 
267
  @app.get("/")
268
  async def root():
269
  return {
270
  "ok": True,
271
- "service": "Create3 MCP + Queue Bridge",
272
- "mcp_endpoint": "/mcp",
273
  "time": now_iso(),
274
  }
275
 
276
 
277
  @app.get("/health")
278
  async def health():
279
- queued = sum(1 for c in commands.values() if c["status"] == "queued")
280
- running = sum(1 for c in commands.values() if c["status"] == "running")
281
- return {"ok": True, "queued": queued, "running": running, "total": len(commands), "time": now_iso()}
282
-
283
-
284
- # ---- REST enqueue endpoint (optional, for testing / non-MCP senders) ----
285
- @app.post("/commands")
286
- async def create_command(cmd: CommandCreateREST, authorization: Optional[str] = Header(default=None)):
287
- check_bearer(authorization)
288
- rec = await enqueue_command(
289
- cmd.command,
290
- args=cmd.args,
291
- source=cmd.source,
292
- priority=cmd.priority,
293
- ttl_seconds=cmd.ttl_seconds,
294
- )
295
- return {"ok": True, "queued": rec}
296
 
297
 
298
  @app.get("/commands")
299
- async def list_commands(limit: int = 20, authorization: Optional[str] = Header(default=None)):
300
- check_bearer(authorization)
301
- items = sorted(commands.values(), key=lambda x: x["updated_at"], reverse=True)[: max(1, min(limit, 100))]
 
 
 
302
  return {"ok": True, "items": items}
303
 
304
 
305
- @app.get("/commands/{command_id}")
306
- async def get_command_rest(command_id: str, authorization: Optional[str] = Header(default=None)):
307
- check_bearer(authorization)
308
- if command_id not in commands:
309
- raise HTTPException(status_code=404, detail="Command not found")
310
- return {"ok": True, "item": commands[command_id]}
311
-
312
-
313
  @app.post("/commands/next")
314
- async def claim_next_command(robot_id: str, authorization: Optional[str] = Header(default=None)):
315
- """
316
- Robot client polls this endpoint to claim one command.
317
- """
318
- check_bearer(authorization)
319
-
320
  async with queue_lock:
321
  while queue:
322
  cmd_id = queue.pop(0)
@@ -326,12 +131,6 @@ async def claim_next_command(robot_id: str, authorization: Optional[str] = Heade
326
  if cmd["status"] != "queued":
327
  continue
328
 
329
- if command_expired(cmd):
330
- cmd["status"] = "expired"
331
- cmd["error"] = "TTL expired before execution"
332
- cmd["updated_at"] = now_iso()
333
- continue
334
-
335
  cmd["status"] = "running"
336
  cmd["claimed_by"] = robot_id
337
  cmd["updated_at"] = now_iso()
@@ -341,13 +140,7 @@ async def claim_next_command(robot_id: str, authorization: Optional[str] = Heade
341
 
342
 
343
  @app.post("/commands/{command_id}/status")
344
- async def update_command_status(
345
- command_id: str,
346
- update: CommandStatusUpdate,
347
- authorization: Optional[str] = Header(default=None),
348
- ):
349
- check_bearer(authorization)
350
-
351
  cmd = commands.get(command_id)
352
  if not cmd:
353
  raise HTTPException(status_code=404, detail="Command not found")
@@ -361,31 +154,7 @@ async def update_command_status(
361
  return {"ok": True}
362
 
363
 
364
- @app.post("/commands/{command_id}/cancel")
365
- async def cancel_command(command_id: str, authorization: Optional[str] = Header(default=None)):
366
- check_bearer(authorization)
367
-
368
- cmd = commands.get(command_id)
369
- if not cmd:
370
- raise HTTPException(status_code=404, detail="Command not found")
371
-
372
- if cmd["status"] in ("done", "failed", "expired"):
373
- return {"ok": True, "status": cmd["status"]}
374
-
375
- cmd["status"] = "failed"
376
- cmd["error"] = "Cancelled by operator"
377
- cmd["updated_at"] = now_iso()
378
-
379
- async with queue_lock:
380
- if command_id in queue:
381
- queue.remove(command_id)
382
-
383
- return {"ok": True, "status": "failed"}
384
-
385
-
386
  # ============================================================
387
- # MOUNT MCP ASGI APP AT /mcp (streamable HTTP transport)
388
  # ============================================================
389
- # FastMCP's streamable_http_app() returns an ASGI app implementing MCP over HTTP.
390
- # This is what Claude/ChatGPT-style MCP clients connect to.
391
- app.router.routes.append(Mount("/mcp", app=mcp.streamable_http_app()))
 
 
1
  import asyncio
2
  from uuid import uuid4
3
  from datetime import datetime, timezone
4
+ from typing import Dict, Any, Optional, List
5
 
6
+ from fastapi import FastAPI, HTTPException
 
7
  from pydantic import BaseModel, Field
 
8
 
 
9
  from mcp.server.fastmcp import FastMCP, Context
10
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  # ============================================================
13
+ # In-memory command queue (local dev)
 
14
  # ============================================================
15
  commands: Dict[str, Dict[str, Any]] = {}
16
  queue: List[str] = []
 
21
  return datetime.now(timezone.utc).isoformat()
22
 
23
 
24
+ async def enqueue(command: str, args: Optional[Dict[str, Any]] = None, source: str = "mcp") -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  cmd_id = str(uuid4())
26
  record = {
27
  "id": cmd_id,
28
  "command": command,
29
  "args": args or {},
30
  "source": source,
31
+ "status": "queued", # queued | running | done | failed
 
 
32
  "result": None,
33
  "error": None,
34
  "created_at": now_iso(),
35
  "updated_at": now_iso(),
36
  "claimed_by": None,
 
37
  }
 
38
  async with queue_lock:
39
  commands[cmd_id] = record
40
+ queue.append(cmd_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  return record
42
 
43
 
44
  # ============================================================
45
+ # MCP server (SSE transport)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  # ============================================================
47
  mcp = FastMCP(
48
+ "Create3 Local Bridge (SSE)",
49
  instructions=(
50
+ "Tools queue robot commands for a local Create 3 runner. "
51
+ "This MCP server does not control the robot directly."
52
  ),
 
 
53
  )
54
 
55
+ # --- Exactly 3 tools ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  @mcp.tool()
58
+ async def dock(ctx: Context) -> dict:
59
+ """Dock the Create 3."""
60
+ rec = await enqueue("dock", {}, source="mcp")
61
  return {"ok": True, "queued": rec}
62
 
63
 
64
  @mcp.tool()
65
+ async def undock(ctx: Context) -> dict:
66
+ """Undock the Create 3."""
67
+ rec = await enqueue("undock", {}, source="mcp")
 
 
 
 
 
 
68
  return {"ok": True, "queued": rec}
69
 
70
 
71
  @mcp.tool()
72
+ async def move_cm(cm: float, ctx: Context) -> dict:
73
+ """Move forward (negative cm moves backward)."""
74
+ rec = await enqueue("move_cm", {"cm": cm}, source="mcp")
 
 
 
 
 
 
 
75
  return {"ok": True, "queued": rec}
76
 
77
 
78
  # ============================================================
79
+ # FastAPI app + queue endpoints for local robot runner
80
  # ============================================================
81
+ app = FastAPI(title="Create3 MCP SSE + Queue (Local)", version="1.0.0")
 
82
 
 
 
 
 
 
 
 
 
 
 
83
 
84
+ class StatusUpdate(BaseModel):
85
+ status: str = Field(..., description="running | done | failed")
86
+ result: Optional[Dict[str, Any]] = None
87
+ error: Optional[str] = None
88
+ robot_id: Optional[str] = None
 
 
 
 
 
 
 
 
 
89
 
90
 
91
  @app.get("/")
92
  async def root():
93
  return {
94
  "ok": True,
95
+ "mcp_sse_endpoint": "/sse",
96
+ "note": "Point your MCP connector to http://localhost:8000/sse",
97
  "time": now_iso(),
98
  }
99
 
100
 
101
  @app.get("/health")
102
  async def health():
103
+ return {
104
+ "ok": True,
105
+ "queued": sum(1 for c in commands.values() if c["status"] == "queued"),
106
+ "running": sum(1 for c in commands.values() if c["status"] == "running"),
107
+ "total": len(commands),
108
+ "time": now_iso(),
109
+ }
 
 
 
 
 
 
 
 
 
 
110
 
111
 
112
  @app.get("/commands")
113
+ async def list_commands(limit: int = 50):
114
+ items = sorted(
115
+ commands.values(),
116
+ key=lambda x: x["updated_at"],
117
+ reverse=True
118
+ )[: max(1, min(limit, 200))]
119
  return {"ok": True, "items": items}
120
 
121
 
 
 
 
 
 
 
 
 
122
  @app.post("/commands/next")
123
+ async def claim_next_command(robot_id: str):
124
+ """Robot runner polls this to claim the next queued command."""
 
 
 
 
125
  async with queue_lock:
126
  while queue:
127
  cmd_id = queue.pop(0)
 
131
  if cmd["status"] != "queued":
132
  continue
133
 
 
 
 
 
 
 
134
  cmd["status"] = "running"
135
  cmd["claimed_by"] = robot_id
136
  cmd["updated_at"] = now_iso()
 
140
 
141
 
142
  @app.post("/commands/{command_id}/status")
143
+ async def update_status(command_id: str, update: StatusUpdate):
 
 
 
 
 
 
144
  cmd = commands.get(command_id)
145
  if not cmd:
146
  raise HTTPException(status_code=404, detail="Command not found")
 
154
  return {"ok": True}
155
 
156
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  # ============================================================
158
+ # Mount MCP SSE app at /sse
159
  # ============================================================
160
+ app.mount("/sse", mcp.sse_app())