openclaw-poc / app.py
techsd66's picture
Update app.py
25a1c18 verified
import os, json, time
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse
app = FastAPI(title="OpenClaw PoC")
API_KEY = os.environ.get("API_KEY")
DATA_DIR = os.environ.get("DATA_DIR", "/app/data")
try:
os.makedirs(DATA_DIR, exist_ok=True)
except PermissionError:
DATA_DIR = "/tmp/openclaw"
os.makedirs(DATA_DIR, exist_ok=True)
LOG_PATH = os.path.join(DATA_DIR, "log.jsonl")
WH_PATH = os.path.join(DATA_DIR, "webhook.jsonl")
def require_key(req: Request):
if not API_KEY:
raise HTTPException(status_code=500, detail="API_KEY is not set")
if req.headers.get("authorization", "") != f"Bearer {API_KEY}":
raise HTTPException(status_code=401, detail="Unauthorized")
def append_jsonl(path: str, obj: dict):
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
base = str(request.base_url).rstrip("/")
return f"""
<html><body style="font-family:sans-serif;max-width:760px;margin:40px auto;">
<h1>OpenClaw PoC 🧫</h1>
<p><b>Base URL:</b> {base}</p>
<p><b>API_KEY configured:</b> {bool(API_KEY)}</p>
<ul>
<li>GET <code>/health</code></li>
<li>POST <code>/api/agent</code> (Bearer API_KEY)</li>
<li>POST <code>/webhook</code> (Bearer API_KEY)</li>
</ul>
<p>Agent payload supports <code>{{"prompt":"..."}}</code> or <code>{{"input":"..."}}</code>.</p>
</body></html>
"""
@app.get("/health")
async def health():
return {"ok": True, "ts": int(time.time())}
@app.post("/api/agent")
async def agent(req: Request):
require_key(req)
body = await req.json()
prompt = body.get("prompt") or body.get("input") or ""
reply = f"🧫 heard: {prompt}" if prompt else "missing prompt"
append_jsonl(LOG_PATH, {"t": int(time.time()), "prompt": prompt, "reply": reply})
return {"reply": reply, "mode": "echo"}
@app.post("/webhook")
async def webhook(req: Request):
require_key(req)
body = await req.json()
append_jsonl(WH_PATH, {"t": int(time.time()), "body": body})
return {"ok": True}
@app.get("/api/health")
async def api_health():
return {"ok": True, "ts": int(time.time())}
@app.get("/healthz")
async def healthz():
return {"ok": True, "ts": int(time.time())}