Python_ai / app.py
Percy3822's picture
Update app.py
4dfde07 verified
import asyncio
import os
import time
import orjson
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
# ----------------------------
# App
# ----------------------------
app = FastAPI(title="Dummy Python AI", version="1.0.0")
START_TS = time.time()
def j(obj) -> str:
# Fast JSON dumps
return orjson.dumps(obj).decode("utf-8")
@app.get("/health")
def health():
return JSONResponse({
"ok": True,
"service": "dummy-ai",
"uptime_sec": round(time.time() - START_TS, 2)
})
# ----------------------------
# WebSocket Protocol
# Client -> Server:
# {"type":"task", "text":"..."} # start a task
# {"type":"telemetry", "cpu":.., "mem":..} # periodic telemetry
# {"type":"cancel"} # cancel current stream
#
# Server -> Client:
# {"type":"ready"} # once on connect
# {"type":"log","msg":"..."} # log line
# {"type":"token","text":"..." } # streaming token
# {"type":"say","text":"..."} # client should speak this ASAP
# {"type":"done","result":"..."} # task completed
# {"type":"error","msg":"..."} # error
# ----------------------------
@app.websocket("/ws/ai")
async def ws_ai(ws: WebSocket):
await ws.accept()
await ws.send_text(j({"type": "ready", "msg": "Dummy AI online"}))
current_task = None
current_cancel = asyncio.Event()
async def stream_dummy_answer(prompt: str):
"""Stream a staged, convincing dummy answer with tokens and say-cues."""
try:
# 1) acknowledge
await ws.send_text(j({"type":"log","msg":f"Received task: {prompt[:120]}"}))
await asyncio.sleep(0.2)
# 2) "thinking…" (simulate tool use / chain-of-thought without revealing it)
phases = [
"Analyzing your request",
"Planning steps",
"Executing subtask 1",
"Executing subtask 2",
"Compiling results"
]
for ph in phases:
if current_cancel.is_set(): return
await ws.send_text(j({"type":"log","msg":ph}))
await asyncio.sleep(0.35)
# 3) start streaming an answer token-by-token
answer = (
"Sure — here’s a dummy streamed response to verify your end-to-end pipeline. "
"I’m emitting short tokens so your client UI can show them live, "
"and your TTS can speak them as they arrive."
)
# also ask client to speak a "lead in" immediately
await ws.send_text(j({"type":"say","text":"Starting response."}))
for token in answer.split(" "):
if current_cancel.is_set(): return
await ws.send_text(j({"type":"token","text":token + " "}))
await asyncio.sleep(0.06) # controls stream cadence
if current_cancel.is_set(): return
await asyncio.sleep(0.15)
await ws.send_text(j({"type":"say","text":"Response complete."}))
await ws.send_text(j({"type":"done","result":"OK"}))
except Exception as e:
await ws.send_text(j({"type":"error","msg":str(e)}))
try:
while True:
raw = await ws.receive_text()
try:
msg = orjson.loads(raw)
except Exception:
await ws.send_text(j({"type":"error","msg":"Invalid JSON"}))
continue
mtype = msg.get("type")
if mtype == "telemetry":
# best-effort log
await ws.send_text(j({
"type":"log",
"msg": f"Telemetry cpu={msg.get('cpu')} mem={msg.get('mem')} active={msg.get('active_window')}"
}))
continue
if mtype == "cancel":
current_cancel.set()
await ws.send_text(j({"type":"log","msg":"Cancel requested"}))
continue
if mtype == "task":
# cancel any ongoing stream
if current_task and not current_task.done():
current_cancel.set()
with contextlib.suppress(asyncio.CancelledError):
current_task.cancel()
current_task = None
await asyncio.sleep(0.05)
# reset cancel flag
current_cancel = asyncio.Event()
prompt = str(msg.get("text", "")).strip() or "(empty)"
current_task = asyncio.create_task(stream_dummy_answer(prompt))
continue
await ws.send_text(j({"type":"error","msg":f"Unknown message type '{mtype}'"}))
except WebSocketDisconnect:
# client left
return
except Exception as e:
try:
await ws.send_text(j({"type":"error","msg":str(e)}))
finally:
return
# ------------- Local run -------------
if _name_ == "_main_":
import uvicorn, contextlib
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))