queryforge / server /app.py
Prithvigg's picture
Upload folder using huggingface_hub
039839b verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
FastAPI application for the Queryforge Environment.
This module creates an HTTP server that exposes the QueryforgeEnvironment
over HTTP and WebSocket endpoints, compatible with EnvClient.
Endpoints:
- POST /reset: Reset the environment
- POST /step: Execute an action
- GET /state: Get current environment state
- GET /schema: Get action/observation schemas
- WS /ws: WebSocket endpoint for persistent sessions
Usage:
# Development (with auto-reload):
uvicorn server.app:app --reload --host 0.0.0.0 --port 8000
# Production:
uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4
# Or run directly:
python -m server.app
"""
try:
from openenv.core.env_server.http_server import create_app
except Exception as e: # pragma: no cover
raise ImportError(
"openenv is required for the web interface. Install dependencies with '\n uv sync\n'"
) from e
try:
from ..models import SQLAction, SQLObservation, TaskSpec
from ..tasks import REGISTRY, task_from_dict
from .queryforge_environment import QueryforgeEnvironment
except ImportError:
from models import SQLAction, SQLObservation, TaskSpec
from tasks import REGISTRY, task_from_dict
from server.queryforge_environment import QueryforgeEnvironment
import gradio as gr
from fastapi import HTTPException
# Create the app with web interface and README integration
app = create_app(
QueryforgeEnvironment,
SQLAction,
SQLObservation,
env_name="queryforge",
max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions
)
# ── Task Registry REST endpoints ──────────────────────────────────────────────
@app.post("/tasks", tags=["Task Registry"], status_code=201)
async def register_task(spec: TaskSpec):
"""Register a custom SQL task. Replaces silently if the ID already exists."""
task = task_from_dict(spec.model_dump())
REGISTRY.register(task)
return {"ok": True, "task_id": task.id, "total_tasks": len(REGISTRY)}
@app.get("/tasks", tags=["Task Registry"])
async def list_tasks():
"""List all registered tasks (built-in + custom)."""
return [
{"id": t.id, "level": t.level, "title": t.title}
for t in REGISTRY.list_all()
]
@app.delete("/tasks/{task_id}", tags=["Task Registry"])
async def delete_task(task_id: str):
"""Remove a custom task. Returns 403 for built-in tasks, 404 if not found."""
try:
REGISTRY.unregister(task_id)
return {"ok": True, "task_id": task_id}
except ValueError as exc:
raise HTTPException(status_code=403, detail=str(exc))
except KeyError:
raise HTTPException(status_code=404, detail=f"Task '{task_id}' not found.")
# ── Gradio demo β€” mounted at /demo so the HF Space App tab shows it ───────────
try:
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from demo import demo as gradio_demo
app = gr.mount_gradio_app(app, gradio_demo, path="/demo")
except Exception as _e:
import warnings
warnings.warn(f"Gradio demo not mounted: {_e}")
def main(host: str = "0.0.0.0", port: int = 8000):
"""
Entry point for direct execution via uv run or python -m.
This function enables running the server without Docker:
uv run --project . server
uv run --project . server --port 8001
python -m queryforge.server.app
Args:
host: Host address to bind to (default: "0.0.0.0")
port: Port number to listen on (default: 8000)
For production deployments, consider using uvicorn directly with
multiple workers:
uvicorn queryforge.server.app:app --workers 4
"""
import uvicorn
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
main()