govon-runtime / src /cli /http_client.py
umyunsang's picture
Upload folder using huggingface_hub
9e65b56 verified
"""GovOn ๋กœ์ปฌ daemon API HTTP ํด๋ผ์ด์–ธํŠธ.
Issue #144: CLI-daemon/LangGraph runtime ์—ฐ๋™ ๋ฐ session resume.
Issue #140: CLI ์Šน์ธ UI ๋ฐ ์ตœ์†Œ ๋ช…๋ น ์ฒด๊ณ„ (๋ฐฑ์—”๋“œ ๋ถ€๋ถ„).
๋กœ์ปฌ daemon(uvicorn)์˜ REST API๋ฅผ ๋ž˜ํ•‘ํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ.
run / approve / cancel ๋“ฑ ํ•ต์‹ฌ ์—”๋“œํฌ์ธํŠธ์— ์ ‘๊ทผํ•œ๋‹ค.
"""
from __future__ import annotations
import json
from typing import Any, Dict, Generator, Iterator, Optional
import httpx
from loguru import logger
class GovOnClient:
"""GovOn ๋กœ์ปฌ daemon HTTP ํด๋ผ์ด์–ธํŠธ.
Parameters
----------
base_url : str
daemon base URL (์˜ˆ: "http://127.0.0.1:8000").
"""
_RUN_TIMEOUT = 120.0
_DEFAULT_TIMEOUT = 30.0
def __init__(self, base_url: str) -> None:
self._base_url = base_url.rstrip("/")
# ------------------------------------------------------------------
# ๊ณต๊ฐœ API
# ------------------------------------------------------------------
def health(self) -> Dict[str, Any]:
"""GET /health โ€” daemon ์ƒํƒœ๋ฅผ ํ™•์ธํ•œ๋‹ค.
Returns
-------
dict
์„œ๋ฒ„๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” health ์‘๋‹ต.
Raises
------
ConnectionError
daemon์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์—†์„ ๋•Œ.
"""
return self._get("/health", timeout=self._DEFAULT_TIMEOUT)
def run(
self,
query: str,
session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""POST /v2/agent/run โ€” ์—์ด์ „ํŠธ ์‹คํ–‰ ์š”์ฒญ.
Parameters
----------
query : str
์‚ฌ์šฉ์ž ์ž…๋ ฅ ์ฟผ๋ฆฌ.
session_id : str | None
๊ธฐ์กด ์„ธ์…˜์„ ์ด์–ด๋ฐ›์„ ๊ฒฝ์šฐ session ID.
Returns
-------
dict
์„œ๋ฒ„ ์‘๋‹ต (thread_id, status ๋“ฑ ํฌํ•จ).
"""
body: Dict[str, Any] = {"query": query}
if session_id is not None:
body["session_id"] = session_id
logger.debug(f"[http_client] run: session_id={session_id} query_len={len(query)}")
return self._post("/v2/agent/run", body=body, timeout=self._RUN_TIMEOUT)
def approve(self, thread_id: str, approved: bool) -> Dict[str, Any]:
"""POST /v2/agent/approve โ€” ์Šน์ธ ๋˜๋Š” ๊ฑฐ์ ˆ.
Parameters
----------
thread_id : str
์Šน์ธ/๊ฑฐ์ ˆํ•  graph thread ID.
approved : bool
True์ด๋ฉด ์Šน์ธ, False์ด๋ฉด ๊ฑฐ์ ˆ.
Returns
-------
dict
์„œ๋ฒ„ ์‘๋‹ต.
"""
logger.debug(f"[http_client] approve: thread_id={thread_id} approved={approved}")
return self._post_params(
"/v2/agent/approve",
params={"thread_id": thread_id, "approved": str(approved).lower()},
timeout=self._DEFAULT_TIMEOUT,
)
def stream(
self,
query: str,
session_id: Optional[str] = None,
) -> Generator[Dict[str, Any], None, None]:
"""POST /v2/agent/stream โ€” SSE ์ŠคํŠธ๋ฆฌ๋ฐ์œผ๋กœ ๋…ธ๋“œ๋ณ„ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•œ๋‹ค.
Parameters
----------
query : str
์‚ฌ์šฉ์ž ์ž…๋ ฅ ์ฟผ๋ฆฌ.
session_id : str | None
๊ธฐ์กด ์„ธ์…˜์„ ์ด์–ด๋ฐ›์„ ๊ฒฝ์šฐ session ID.
Yields
------
dict
ํŒŒ์‹ฑ๋œ SSE ์ด๋ฒคํŠธ dict. ์ตœ์†Œ ``node``์™€ ``status`` ํ‚ค๋ฅผ ํฌํ•จํ•œ๋‹ค.
Raises
------
ConnectionError
daemon์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์—†์„ ๋•Œ.
httpx.HTTPStatusError
HTTP ์˜ค๋ฅ˜ ์‘๋‹ต ์‹œ.
"""
body: Dict[str, Any] = {"query": query}
if session_id is not None:
body["session_id"] = session_id
url = f"{self._base_url}/v2/agent/stream"
logger.debug(f"[http_client] stream: session_id={session_id} query_len={len(query)}")
try:
timeout = httpx.Timeout(connect=10.0, read=300.0, write=10.0, pool=10.0)
with httpx.Client(timeout=timeout) as client:
with client.stream("POST", url, json=body) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
line = line.strip()
if not line:
continue
if line.startswith("data:"):
data_str = line[len("data:") :].strip()
if not data_str:
continue
try:
event = json.loads(data_str)
yield event
except json.JSONDecodeError:
logger.warning(f"[http_client] SSE JSON ํŒŒ์‹ฑ ์‹คํŒจ: {data_str!r}")
continue
except httpx.ConnectError as exc:
raise ConnectionError(f"daemon์ด ์‹คํ–‰ ์ค‘์ด ์•„๋‹™๋‹ˆ๋‹ค. ({self._base_url})") from exc
except httpx.HTTPStatusError as exc:
logger.error(f"[http_client] HTTP {exc.response.status_code}: {url}")
raise
def cancel(self, thread_id: str) -> Dict[str, Any]:
"""POST /v2/agent/cancel โ€” ์‹คํ–‰ ์ค‘์ธ ์„ธ์…˜ ์ทจ์†Œ.
Parameters
----------
thread_id : str
์ทจ์†Œํ•  graph thread ID.
Returns
-------
dict
์„œ๋ฒ„ ์‘๋‹ต.
"""
logger.debug(f"[http_client] cancel: thread_id={thread_id}")
return self._post_params(
"/v2/agent/cancel",
params={"thread_id": thread_id},
timeout=self._DEFAULT_TIMEOUT,
)
# ------------------------------------------------------------------
# ๋‚ด๋ถ€ ํ—ฌํผ
# ------------------------------------------------------------------
def _get(self, path: str, *, timeout: float) -> Dict[str, Any]:
url = f"{self._base_url}{path}"
try:
with httpx.Client(timeout=timeout) as client:
resp = client.get(url)
resp.raise_for_status()
return resp.json()
except httpx.ConnectError as exc:
raise ConnectionError(f"daemon์ด ์‹คํ–‰ ์ค‘์ด ์•„๋‹™๋‹ˆ๋‹ค. ({self._base_url})") from exc
except httpx.HTTPStatusError as exc:
logger.error(f"[http_client] HTTP {exc.response.status_code}: {url}")
raise
def _post(
self,
path: str,
*,
body: Dict[str, Any],
timeout: float,
) -> Dict[str, Any]:
url = f"{self._base_url}{path}"
try:
with httpx.Client(timeout=timeout) as client:
resp = client.post(url, json=body)
resp.raise_for_status()
return resp.json()
except httpx.ConnectError as exc:
raise ConnectionError(f"daemon์ด ์‹คํ–‰ ์ค‘์ด ์•„๋‹™๋‹ˆ๋‹ค. ({self._base_url})") from exc
except httpx.HTTPStatusError as exc:
logger.error(f"[http_client] HTTP {exc.response.status_code}: {url}")
raise
def _post_params(
self,
path: str,
*,
params: Dict[str, Any],
timeout: float,
) -> Dict[str, Any]:
"""์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” POST ์š”์ฒญ ํ—ฌํผ.
`/v2/agent/approve`, `/v2/agent/cancel` ๋“ฑ FastAPI ์—”๋“œํฌ์ธํŠธ๊ฐ€
์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ธฐ๋Œ€ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.
"""
url = f"{self._base_url}{path}"
try:
with httpx.Client(timeout=timeout) as client:
resp = client.post(url, params=params)
resp.raise_for_status()
return resp.json()
except httpx.ConnectError as exc:
raise ConnectionError(f"daemon์ด ์‹คํ–‰ ์ค‘์ด ์•„๋‹™๋‹ˆ๋‹ค. ({self._base_url})") from exc
except httpx.HTTPStatusError as exc:
logger.error(f"[http_client] HTTP {exc.response.status_code}: {url}")
raise