govon-runtime / src /inference /api_server.py
github-actions
sync: e3a248f
7d267a5
import asyncio
import json
import os
import re
import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Request, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import APIKeyHeader
from loguru import logger
try:
import httpx as _httpx
except ImportError:
_httpx = None
from .adapter_registry import AdapterRegistry
from .agent_manager import AgentManager
from .feature_flags import FeatureFlags
from .runtime_config import RuntimeConfig
from .schemas import (
AgentRunRequest,
GenerateCivilResponseRequest,
GenerateCivilResponseResponse,
)
from .session_context import SessionContext, SessionStore
SKIP_MODEL_LOAD = os.getenv("SKIP_MODEL_LOAD", "false").lower() in ("true", "1", "yes")
try:
from slowapi import Limiter
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
_RATE_LIMIT_AVAILABLE = True
except ImportError:
limiter = None
_RATE_LIMIT_AVAILABLE = False
_API_KEY = os.getenv("API_KEY")
_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
_ALLOW_NO_AUTH = os.getenv("ALLOW_NO_AUTH", "false").lower() in ("true", "1")
async def verify_api_key(api_key: str = Security(_api_key_header)):
if _API_KEY is None:
if _ALLOW_NO_AUTH:
return
raise HTTPException(
status_code=401, detail="API_KEY๊ฐ€ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ์„œ๋ฒ„ ๊ด€๋ฆฌ์ž์—๊ฒŒ ๋ฌธ์˜ํ•˜์„ธ์š”."
)
if api_key != _API_KEY:
raise HTTPException(status_code=401, detail="์œ ํšจํ•˜์ง€ ์•Š์€ API ํ‚ค์ž…๋‹ˆ๋‹ค.")
runtime_config = RuntimeConfig.from_env()
runtime_config.log_summary()
MODEL_PATH = runtime_config.model.model_path
DATA_PATH = runtime_config.paths.data_path
INDEX_PATH = runtime_config.paths.index_path
GPU_UTILIZATION = runtime_config.gpu_utilization
MAX_MODEL_LEN = runtime_config.max_model_len
TRUST_REMOTE_CODE = runtime_config.model.trust_remote_code
_PROJECT_ROOT = str(Path(__file__).resolve().parent.parent.parent)
AGENTS_DIR = runtime_config.paths.agents_dir
@dataclass
class SamplingParams:
"""vLLM HTTP API์šฉ ์ƒ˜ํ”Œ๋ง ํŒŒ๋ผ๋ฏธํ„ฐ. vLLM ์ง์ ‘ import ์—†์ด ๋™์ž‘."""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 1.0
stop: Optional[list] = None
repetition_penalty: float = 1.0
@dataclass
class PreparedGeneration:
prompt: str
sampling_params: SamplingParams
class _VLLMOutputItem:
"""vLLM HTTP ์‘๋‹ต์˜ ๋‹จ์ผ choice๋ฅผ ๊ธฐ์กด ์ธํ„ฐํŽ˜์ด์Šค๋กœ ๋ž˜ํ•‘."""
def __init__(self, text: str, finish_reason: str, token_ids: list):
self.text = text
self.finish_reason = finish_reason
self.token_ids = token_ids
class _VLLMHttpResult:
"""vLLM HTTP ์‘๋‹ต์„ ๊ธฐ์กด AsyncLLM ๊ฒฐ๊ณผ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ๋ž˜ํ•‘.
๊ธฐ์กด ์ฝ”๋“œ๊ฐ€ ``output.outputs[0].text``, ``output.prompt_token_ids`` ๋“ฑ์—
์ ‘๊ทผํ•˜๋ฏ€๋กœ ๋™์ผํ•œ ์†์„ฑ์„ ์ œ๊ณตํ•œ๋‹ค.
"""
def __init__(self, data: dict):
self._data = data
choices = data.get("choices", [])
usage = data.get("usage", {})
self.outputs = []
for choice in choices:
msg = choice.get("message", {})
text = msg.get("content", "")
finish = choice.get("finish_reason", "stop")
self.outputs.append(
_VLLMOutputItem(
text=text,
finish_reason=finish,
token_ids=list(range(usage.get("completion_tokens", 0))),
)
)
self.prompt_token_ids = list(range(usage.get("prompt_tokens", 0)))
def _extract_approval_request(graph_state: Any) -> Any:
"""LangGraph interrupt state์—์„œ approval payload๋ฅผ ์ถ”์ถœํ•œ๋‹ค."""
if not graph_state or not getattr(graph_state, "tasks", None):
return None
task = graph_state.tasks[0]
if not getattr(task, "interrupts", None):
return None
return task.interrupts[0].value
class vLLMEngineManager:
"""GovOn Shell MVP์šฉ ๋กœ์ปฌ ๋Ÿฐํƒ€์ž„ ๋งค๋‹ˆ์ €.
vLLM์€ ๋ณ„๋„ ํ”„๋กœ์„ธ์Šค(entrypoint.sh)์—์„œ OpenAI-compatible ์„œ๋ฒ„๋กœ ์‹คํ–‰๋œ๋‹ค.
์ด ํด๋ž˜์Šค๋Š” httpx๋กœ vLLM HTTP API๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
"""
def __init__(self):
self._vllm_base_url = f"http://localhost:{os.getenv('VLLM_PORT', '8000')}"
self._http_client: Optional[Any] = None
self.feature_flags = FeatureFlags.from_env()
self.session_store = SessionStore()
self.agent_manager = AgentManager(AGENTS_DIR)
self._api_lookup_action = None # MinwonAnalysisAction (์ง€์—ฐ ์ดˆ๊ธฐํ™”)
self._draft_response_fn = None # ๋‹ต๋ณ€ ์ดˆ์•ˆ ์ƒ์„ฑ ํด๋กœ์ € (์ง€์—ฐ ์ดˆ๊ธฐํ™”)
self.graph = None # LangGraph CompiledGraph (v2 ์—”๋“œํฌ์ธํŠธ์šฉ)
self.graph_v3 = None # v3 ReAct graph (v3 ์—”๋“œํฌ์ธํŠธ์šฉ)
self._checkpointer_ctx = None # AsyncSqliteSaver ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ € (lifespan์—์„œ ๊ด€๋ฆฌ)
self._sync_checkpointer_conn = None # SqliteSaver์šฉ sqlite3 connection (leak ๋ฐฉ์ง€)
# session_id ๋‹จ์œ„ ๋น„๋™๊ธฐ ๋ฝ: ๋™์‹œ ์š”์ฒญ race condition ๋ฐฉ์ง€
self._session_locks: dict[str, asyncio.Lock] = {}
self._init_tools()
# _init_graph()๋Š” lifespan()์—์„œ ํ˜ธ์ถœ โ€” ๋ชจ๋“ˆ ๋กœ๋“œ ์‹œ์  ์‹คํ–‰ ๋ฐฉ์ง€
def get_session_lock(self, session_id: str) -> asyncio.Lock:
"""session_id ๋‹จ์œ„ ๋น„๋™๊ธฐ ๋ฝ์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ๋™์‹œ ์š”์ฒญ race condition ๋ฐฉ์ง€."""
if session_id not in self._session_locks:
self._session_locks[session_id] = asyncio.Lock()
return self._session_locks[session_id]
async def initialize(self):
if SKIP_MODEL_LOAD:
logger.info("SKIP_MODEL_LOAD=true: ๋ชจ๋ธ ๋ฐ ์ธ๋ฑ์Šค ๋กœ๋”ฉ์„ ๊ฑด๋„ˆ๋œ๋‹ˆ๋‹ค.")
return
# vLLM ์„œ๋ฒ„๋Š” entrypoint.sh์—์„œ ์ด๋ฏธ ๊ธฐ๋™๋จ โ€” health check๋งŒ ์ˆ˜ํ–‰
logger.info(f"vLLM ์„œ๋ฒ„ ์—ฐ๊ฒฐ ํ™•์ธ: {self._vllm_base_url}")
if _httpx is None:
raise RuntimeError("httpx๊ฐ€ ์„ค์น˜๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค. pip install httpx")
self._http_client = _httpx.AsyncClient(
base_url=self._vllm_base_url,
timeout=_httpx.Timeout(300.0, connect=30.0),
)
# vLLM ์„œ๋ฒ„ health check (entrypoint.sh์—์„œ ์ด๋ฏธ ํ™•์ธํ–ˆ์ง€๋งŒ ์ด์ค‘ ๊ฒ€์ฆ)
for attempt in range(10):
try:
resp = await self._http_client.get("/health")
if resp.status_code == 200:
logger.info("vLLM ์„œ๋ฒ„ ์—ฐ๊ฒฐ ์„ฑ๊ณต")
return
except Exception:
pass
logger.debug(f"vLLM ์„œ๋ฒ„ ๋Œ€๊ธฐ ์ค‘... ({attempt + 1}/10)")
await asyncio.sleep(3)
raise RuntimeError(f"vLLM ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {self._vllm_base_url}")
def _escape_special_tokens(self, text: str) -> str:
tokens = [
"[|user|]",
"[|assistant|]",
"[|system|]",
"[|endofturn|]",
"<thought>",
"</thought>",
]
for token in tokens:
text = text.replace(
token,
token.replace("[", "\\[")
.replace("]", "\\]")
.replace("<", "\\<")
.replace(">", "\\>"),
)
return text
@staticmethod
def _strip_thought_blocks(text: str) -> str:
# <thought>...</thought> (๊ตฌํ˜•) ๋ฐ <think>...</think> (EXAONE-4.0 ์ถ”๋ก  ๋ชจ๋“œ) ๋ชจ๋‘ ์ œ๊ฑฐ
text = re.sub(r"<thought>.*?</thought>\s*", "", text, flags=re.DOTALL)
text = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
return text.strip()
def _build_persona_prompt(self, agent_name: str, user_message: str) -> str:
if self.agent_manager and self.agent_manager.get_agent(agent_name):
return self.agent_manager.build_prompt(agent_name, user_message)
return user_message
def _extract_query(self, prompt: str) -> str:
user_match = re.search(r"\[\|user\|\](.*?)\[\|endofturn\|\]", prompt, re.DOTALL)
if user_match:
user_block = user_match.group(1)
complaint_match = re.search(r"๋ฏผ์›\s*๋‚ด์šฉ\s*:\s*(.+)", user_block, re.DOTALL)
if complaint_match:
return complaint_match.group(1).strip()
return user_block.strip()
return prompt
@staticmethod
def _is_evidence_request(query: str) -> bool:
return any(token in query for token in ("๊ทผ๊ฑฐ", "์ถœ์ฒ˜", "์™œ", "์ด์œ ", "๋งํฌ"))
@staticmethod
def _is_revision_request(query: str) -> bool:
return any(token in query for token in ("๋‹ค์‹œ", "์ˆ˜์ •", "๊ณ ์ณ", "์ •์ค‘", "๊ณต์†", "๋ณด๊ฐ•"))
def _latest_prior_turns(
self,
session: SessionContext,
current_query: str,
) -> tuple[Optional[str], Optional[str]]:
turns = list(session.recent_history)
if turns and turns[-1].role == "user" and turns[-1].content == current_query:
turns = turns[:-1]
previous_user = next(
(turn.content for turn in reversed(turns) if turn.role == "user"), None
)
previous_assistant = next(
(turn.content for turn in reversed(turns) if turn.role == "assistant"),
None,
)
return previous_user, previous_assistant
def _build_working_query(self, query: str, session: SessionContext) -> str:
query = query.strip()
if not query:
return query
if not (self._is_evidence_request(query) or self._is_revision_request(query)):
return query
previous_user, previous_assistant = self._latest_prior_turns(session, query)
parts: List[str] = []
if previous_user:
parts.append(f"์›๋ž˜ ์š”์ฒญ: {previous_user}")
if previous_assistant:
parts.append(f"์ด์ „ ๋‹ต๋ณ€: {previous_assistant[:600]}")
if self._is_revision_request(query):
parts.append(f"์ˆ˜์ • ์š”์ฒญ: {query}")
return "\n\n".join(parts) if parts else query
@staticmethod
def _format_evidence_items(evidence_dict: Dict[str, Any]) -> str:
"""EvidenceEnvelope dict๋ฅผ ์†Œ๋น„ํ•˜์—ฌ ์ถœ์ฒ˜ ๋ชฉ๋ก ํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
EvidenceItem์ด ์žˆ์œผ๋ฉด source-specific branching ์—†์ด ๋‹จ์ผ ํฌ๋งคํ„ฐ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.
"""
items = evidence_dict.get("items", [])
if not items:
return ""
lines: list[str] = []
for idx, item in enumerate(items[:10], start=1):
source_type = item.get("source_type", "")
title = item.get("title", "")
link = item.get("link_or_path", "")
# source_type์— ๋”ฐ๋ผ ๊ธฐ๋ณธ label๋งŒ ๋‹ค๋ฅด๊ณ  ํฌ๋งท์€ ๋™์ผ
label = (title or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ") if source_type == "api" else (title or "์ƒ์„ฑ ์ฐธ์กฐ")
lines.append(f"[{idx}] {label} - {link}" if link else f"[{idx}] {label}")
return "\n".join(lines)
def _summarize_evidence(
self,
api_lookup_data: Dict[str, Any],
) -> str:
# EvidenceEnvelope๊ฐ€ ์žˆ์œผ๋ฉด ์šฐ์„  ์‚ฌ์šฉ
evidence = api_lookup_data.get("evidence")
if isinstance(evidence, dict) and evidence.get("items"):
lines = ["๊ทผ๊ฑฐ ์š”์•ฝ"]
api_items = [i for i in evidence["items"] if i.get("source_type") == "api"]
if api_items:
titles = ", ".join(i["title"] for i in api_items[:3] if i.get("title"))
lines.append(
f"- ์™ธ๋ถ€ ๋ฏผ์›๋ถ„์„ API์—์„œ ์œ ์‚ฌ ์‚ฌ๋ก€ {len(api_items)}๊ฑด์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค."
+ (f" ๋Œ€ํ‘œ ์‚ฌ๋ก€: {titles}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- ๋‚ด๋ถ€ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ถฉ๋ถ„ํžˆ ํ™•๋ณดํ•˜์ง€ ๋ชปํ•ด ์ผ๋ฐ˜ ํ–‰์ • ์‘๋Œ€ ์›์น™ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค."
)
return "\n".join(lines)
# Legacy ํฌ๋งคํ„ฐ (EvidenceItem ์—†์„ ๋•Œ)
lines = ["๊ทผ๊ฑฐ ์š”์•ฝ"]
api_results = api_lookup_data.get("results", [])
if api_results:
titles = []
for item in api_results[:3]:
title = item.get("title") or item.get("qnaTitle") or item.get("question")
if title:
titles.append(title)
lines.append(
f"- ์™ธ๋ถ€ ๋ฏผ์›๋ถ„์„ API์—์„œ ์œ ์‚ฌ ์‚ฌ๋ก€ {len(api_results)}๊ฑด์„ ํ™•์ธํ–ˆ์Šต๋‹ˆ๋‹ค."
+ (f" ๋Œ€ํ‘œ ์‚ฌ๋ก€: {', '.join(titles)}" if titles else "")
)
if len(lines) == 1:
lines.append(
"- ๋‚ด๋ถ€ ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ์ถฉ๋ถ„ํžˆ ํ™•๋ณดํ•˜์ง€ ๋ชปํ•ด ์ผ๋ฐ˜ ํ–‰์ • ์‘๋Œ€ ์›์น™ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค."
)
return "\n".join(lines)
@staticmethod
def _api_source_line(index: int, item: Dict[str, Any]) -> str:
title = item.get("title") or item.get("qnaTitle") or item.get("question") or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ"
url = item.get("url") or item.get("detailUrl") or ""
if url:
return f"[{index}] {title} - {url}"
return f"[{index}] {title}"
def _build_evidence_section(
self,
session: SessionContext,
current_query: str,
api_data: Dict[str, Any],
) -> str:
_, previous_answer = self._latest_prior_turns(session, current_query)
lines = ["๊ทผ๊ฑฐ/์ถœ์ฒ˜"]
cursor = 1
# EvidenceEnvelope๊ฐ€ ์žˆ์œผ๋ฉด ๋‹จ์ผ ํฌ๋งคํ„ฐ๋กœ ์šฐ์„  ์ฒ˜๋ฆฌ
api_evidence = api_data.get("evidence")
if api_evidence and isinstance(api_evidence, dict) and api_evidence.get("items"):
for item in api_evidence["items"][:5]:
title = item.get("title", "") or "์™ธ๋ถ€ API ๊ฒฐ๊ณผ"
link = item.get("link_or_path", "")
if link:
lines.append(f"[{cursor}] {title} - {link}")
else:
lines.append(f"[{cursor}] {title}")
cursor += 1
else:
# Legacy API ํฌ๋งคํ„ฐ
api_items = api_data.get("citations") or api_data.get("results") or []
for item in api_items[:5]:
lines.append(self._api_source_line(cursor, item))
cursor += 1
if cursor == 1:
lines.append("- ๊ฒ€์ƒ‰ ๊ฐ€๋Šฅํ•œ ๊ทผ๊ฑฐ๋ฅผ ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.")
section = "\n".join(lines)
if previous_answer:
return f"{previous_answer}\n\n{section}"
return section
async def _prepare_civil_response_generation(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
external_cases: Optional[List[dict]] = None,
) -> PreparedGeneration:
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
user_content = f"๋‹ค์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด ์ฃผ์„ธ์š”.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def _prepare_draft_only(
self,
request: GenerateCivilResponseRequest,
flags: Optional[FeatureFlags] = None,
) -> PreparedGeneration:
"""LoRA ์ดˆ์•ˆ ์ƒ์„ฑ์šฉ: ์ฟผ๋ฆฌ๋งŒ์œผ๋กœ ํ”„๋กฌํ”„ํŠธ ์ƒ์„ฑ.
์‚ฌ์šฉ์ž ์ฟผ๋ฆฌ๋ฅผ persona ํ”„๋กฌํ”„ํŠธ๋กœ ๊ฐ์‹ธ์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
"""
gen_defaults = runtime_config.generation
safe_message = self._escape_special_tokens(self._extract_query(request.prompt))
# ํ•™์Šต ๋ฐ์ดํ„ฐ ํ˜•์‹: user = instruction + "\n\n" + input
user_content = f"๋‹ค์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด ์ฃผ์„ธ์š”.\n\n{safe_message}"
prompt = self._build_persona_prompt("draft_response", user_content)
sampling_params = SamplingParams(
temperature=(
request.temperature if request.temperature is not None else gen_defaults.temperature
),
top_p=request.top_p if request.top_p is not None else gen_defaults.top_p,
max_tokens=request.max_tokens or gen_defaults.max_tokens,
stop=request.stop or gen_defaults.stop_sequences,
repetition_penalty=gen_defaults.repetition_penalty,
)
return PreparedGeneration(
prompt=prompt,
sampling_params=sampling_params,
)
async def _run_engine(
self,
prompt: str,
sampling_params: SamplingParams,
request_id: str,
lora_request=None,
):
"""vLLM OpenAI-compatible HTTP API๋ฅผ ํ†ตํ•ด ํ…์ŠคํŠธ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค."""
if self._http_client is None:
return None
# EXAONE chat template ํ˜•์‹์˜ prompt๋ฅผ messages๋กœ ๋ณ€ํ™˜
messages = self._prompt_to_messages(prompt)
body: Dict[str, Any] = {
"model": MODEL_PATH,
"messages": messages,
"max_tokens": sampling_params.max_tokens,
"temperature": sampling_params.temperature,
"stream": False,
}
if sampling_params.top_p is not None and sampling_params.top_p < 1.0:
body["top_p"] = sampling_params.top_p
if sampling_params.stop:
body["stop"] = list(sampling_params.stop)
if sampling_params.repetition_penalty and sampling_params.repetition_penalty != 1.0:
body["repetition_penalty"] = sampling_params.repetition_penalty
# LoRA ์–ด๋Œ‘ํ„ฐ ์ง€์ •
if lora_request is not None:
body["model"] = lora_request.lora_name
try:
resp = await self._http_client.post("/v1/chat/completions", json=body)
resp.raise_for_status()
data = resp.json()
except _httpx.TimeoutException as exc:
logger.error(f"vLLM HTTP ํƒ€์ž„์•„์›ƒ: {exc}")
return None
except _httpx.HTTPStatusError as exc:
logger.error(f"vLLM HTTP {exc.response.status_code}: {exc}")
return None
except Exception as exc:
logger.error(f"vLLM HTTP ํ˜ธ์ถœ ์‹คํŒจ: {exc}")
return None
# OpenAI ์‘๋‹ต์„ ๊ธฐ์กด ์ธํ„ฐํŽ˜์ด์Šค์™€ ํ˜ธํ™˜๋˜๋Š” ๊ฐ์ฒด๋กœ ๋ž˜ํ•‘
return _VLLMHttpResult(data)
@staticmethod
def _prompt_to_messages(prompt: str) -> list:
"""EXAONE chat template ํ˜•์‹ ํ”„๋กฌํ”„ํŠธ๋ฅผ OpenAI messages๋กœ ๋ณ€ํ™˜."""
messages = []
# [|system|]...[|endofturn|], [|user|]...[|endofturn|], [|assistant|]... ํŒŒ์‹ฑ
import re as _re
# ์ด์Šค์ผ€์ดํ”„๋œ ํ† ํฐ์€ _escape_special_tokens()์—์„œ ์ด๋ฏธ ์ฒ˜๋ฆฌ๋˜์–ด
# ์ด ์‹œ์ ์—๋Š” ์›๋ณธ ํ˜•ํƒœ [|role|]๋กœ ์ „๋‹ฌ๋œ๋‹ค.
parts = _re.split(r"\[\|(\w+)\|\]", prompt)
role = None
for part in parts:
if part in ("system", "user", "assistant"):
role = part
elif role and part.strip():
content = part.replace("[|endofturn|]", "").strip()
if content:
messages.append({"role": role, "content": content})
role = None
if not messages:
messages = [{"role": "user", "content": prompt}]
return messages
def _init_tools(self) -> None:
"""LangGraph ๋„๊ตฌ ํŒฉํ† ๋ฆฌ์— ์ „๋‹ฌํ•  action ๋ฐ ํด๋กœ์ €๋ฅผ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค."""
try:
from src.inference.actions.data_go_kr import MinwonAnalysisAction
self._api_lookup_action = MinwonAnalysisAction()
except Exception as exc: # noqa: BLE001 - ์˜์กด์„ฑ ๋กœ๋“œ ์‹คํŒจ ์‹œ graceful degradation
logger.warning(f"MinwonAnalysisAction ์ดˆ๊ธฐํ™” ์‹คํŒจ (๋„๊ตฌ ์—†์ด ์ง„ํ–‰): {exc}")
self._api_lookup_action = None
engine_ref = self
async def _draft_response_tool(
query: str,
context: dict,
session: SessionContext,
) -> dict:
working_query = engine_ref._build_working_query(query, session)
# LoRA-First: ์ฟผ๋ฆฌ๋งŒ์œผ๋กœ ์ดˆ์•ˆ ์ƒ์„ฑ
adapter_name = context.get("adapter") if context else None
if not adapter_name:
adapter_name = "public_admin"
_adapter_reg = AdapterRegistry.get_instance()
lora_req = _adapter_reg.get_lora_request(adapter_name)
gen_request = GenerateCivilResponseRequest(
prompt=working_query,
max_tokens=2048,
temperature=0.7,
)
request_id = str(uuid.uuid4())
prepared = await engine_ref._prepare_draft_only(gen_request)
final_output = await engine_ref._run_engine(
prepared.prompt, prepared.sampling_params, request_id, lora_request=lora_req
)
if final_output is None or not final_output.outputs:
return {
"text": "",
"draft_text": "",
"success": False,
"error": "๋ฏผ์› ๋‹ต๋ณ€ ์ดˆ์•ˆ ์ƒ์„ฑ ์‹คํŒจ",
"results": [],
"context_text": "",
}
draft_text = engine_ref._strip_thought_blocks(final_output.outputs[0].text)
return {
"text": draft_text,
"draft_text": draft_text,
"success": True,
"results": [],
"context_text": draft_text,
"prompt_tokens": len(final_output.prompt_token_ids),
"completion_tokens": len(final_output.outputs[0].token_ids),
}
self._draft_response_fn = _draft_response_tool
def _build_langgraph_tools(self) -> list:
"""LangGraph ToolNode์šฉ ๋„๊ตฌ ๋ชฉ๋ก์„ ์ƒ์„ฑํ•œ๋‹ค.
build_all_tools()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ StructuredTool ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
"""
from src.inference.graph.tools import build_all_tools
return build_all_tools(
api_lookup_action=self._api_lookup_action,
draft_response_fn=self._draft_response_fn,
)
def _init_graph_with_async_checkpointer(self, checkpointer: object) -> None:
"""lifespan์—์„œ AsyncSqliteSaver๊ฐ€ ์ค€๋น„๋œ ํ›„ graph๋ฅผ ์žฌ๊ตฌ์„ฑํ•œ๋‹ค."""
self._init_graph(checkpointer=checkpointer)
def _init_graph(self, checkpointer: Optional[object] = None) -> None:
"""LangGraph StateGraph๋ฅผ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค.
v4 ์•„ํ‚คํ…์ฒ˜: ReAct + ToolNode ๊ธฐ๋ฐ˜.
LLM์ด ์ž์œจ์ ์œผ๋กœ ๋„๊ตฌ ํ˜ธ์ถœ์„ ๊ฒฐ์ •ํ•œ๋‹ค.
Parameters
----------
checkpointer : optional
์™ธ๋ถ€์—์„œ ์ฃผ์ž…ํ•  LangGraph checkpointer.
None์ด๋ฉด SqliteSaver(๋™๊ธฐ sqlite3)๋ฅผ ์‹œ๋„ํ•˜๊ณ ,
import ์‹คํŒจ ์‹œ MemorySaver๋กœ fallbackํ•œ๋‹ค.
SqliteSaver DB ๊ฒฝ๋กœ๋Š” SessionStore DB์™€ ๊ฐ™์€ ๋””๋ ‰ํ„ฐ๋ฆฌ์—
``langgraph_checkpoints.db``๋กœ ์ƒ์„ฑ๋œ๋‹ค (๊ด€์‹ฌ์‚ฌ ๋ถ„๋ฆฌ).
"""
try:
from src.inference.graph.builder import build_govon_graph
except ImportError as exc:
logger.warning(f"LangGraph graph ์ดˆ๊ธฐํ™” ์‹คํŒจ (import ์˜ค๋ฅ˜): {exc}")
return
tools = self._build_langgraph_tools()
# max_tokens ๋™์  ๊ณ„์‚ฐ: max_model_len์—์„œ ์ž…๋ ฅ ์˜ค๋ฒ„ํ—ค๋“œ(์‹œ์Šคํ…œ ํ”„๋กฌํ”„ํŠธ+๋„๊ตฌ ์Šคํ‚ค๋งˆ+๋Œ€ํ™”์ด๋ ฅ)๋ฅผ ์ œ์™ธ
# ์‹œ์Šคํ…œ ํ”„๋กฌํ”„ํŠธ ~500 + ๋„๊ตฌ ์Šคํ‚ค๋งˆ ~1000 + ์•ˆ์ „ ๋งˆ์ง„ ~500 = 2000 ์˜ค๋ฒ„ํ—ค๋“œ
_max_model_len = runtime_config.max_model_len
_llm_max_tokens = max(256, min(1024, _max_model_len - 2000))
if _max_model_len < 2500:
logger.warning(
f"[_init_graph] max_model_len={_max_model_len}์ด ๋งค์šฐ ์ž‘์Šต๋‹ˆ๋‹ค. "
f"llm_max_tokens={_llm_max_tokens}๋กœ ์ œํ•œ๋ฉ๋‹ˆ๋‹ค."
)
logger.info(
f"[_init_graph] max_model_len={_max_model_len}, llm_max_tokens={_llm_max_tokens}"
)
# LLM ์ธ์Šคํ„ด์Šค ๊ตฌ์„ฑ
if SKIP_MODEL_LOAD:
# CI/ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ: LLM์ด ์—†์œผ๋ฏ€๋กœ graph ์ดˆ๊ธฐํ™” ์Šคํ‚ต
logger.info("SKIP_MODEL_LOAD=true: LangGraph graph ์ดˆ๊ธฐํ™” ์Šคํ‚ต")
return
elif os.getenv("LANGGRAPH_MODEL_BASE_URL"):
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
base_url=os.environ["LANGGRAPH_MODEL_BASE_URL"],
api_key=os.getenv("LANGGRAPH_MODEL_API_KEY", "EMPTY"),
model=os.getenv("LANGGRAPH_PLANNER_MODEL", runtime_config.model.model_path),
temperature=0.0,
max_tokens=_llm_max_tokens,
)
else:
# ์šด์˜ ํ™˜๊ฒฝ: vLLM OpenAI-compatible endpoint ์‚ฌ์šฉ
from langchain_openai import ChatOpenAI
vllm_port = os.getenv("VLLM_PORT", "8000")
llm = ChatOpenAI(
base_url=f"http://localhost:{vllm_port}/v1",
api_key="EMPTY",
model=runtime_config.model.model_path,
temperature=0.0,
max_tokens=_llm_max_tokens,
)
# checkpointer๊ฐ€ ์™ธ๋ถ€์—์„œ ์ฃผ์ž…๋˜์ง€ ์•Š์œผ๋ฉด SqliteSaver๋ฅผ ์‹œ๋„ํ•œ๋‹ค.
if checkpointer is None:
checkpointer, conn = _build_sync_sqlite_checkpointer(self.session_store.db_path)
if self._sync_checkpointer_conn is not None:
try:
self._sync_checkpointer_conn.close()
except Exception:
pass
self._sync_checkpointer_conn = conn
self.graph = build_govon_graph(
llm=llm,
tools=tools,
session_store=self.session_store,
checkpointer=checkpointer,
)
logger.info("LangGraph v2 graph ์ดˆ๊ธฐํ™” ์™„๋ฃŒ")
# v3 ReAct graph ์ดˆ๊ธฐํ™” โ€” v2์™€ ๋™์ผ checkpointer ๊ณต์œ 
# (thread_id๊ฐ€ ํ•ญ์ƒ ์ƒˆ UUID์ด๋ฏ€๋กœ checkpoint ์ถฉ๋Œ ์—†์Œ)
try:
from src.inference.graph.builder import build_govon_graph_v3
self.graph_v3 = build_govon_graph_v3(
llm=llm,
tools=tools,
session_store=self.session_store,
checkpointer=checkpointer,
)
logger.info("LangGraph v3 ReAct graph ์ดˆ๊ธฐํ™” ์™„๋ฃŒ")
except (ImportError, AttributeError) as exc:
logger.warning(f"v3 graph ์ดˆ๊ธฐํ™” ์‹คํŒจ (v2๋Š” ์ •์ƒ): {exc}")
self.graph_v3 = None
except Exception as exc:
logger.exception(f"v3 graph ์ดˆ๊ธฐํ™” ์ค‘ ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {exc}")
self.graph_v3 = None
def _build_sync_sqlite_checkpointer(
session_db_path: str,
) -> tuple:
"""SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver(fallback)๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
LangGraph checkpointer์šฉ SQLite DB๋Š” SessionStore์˜ sessions.sqlite3์™€
๊ฐ™์€ ๋””๋ ‰ํ„ฐ๋ฆฌ์— ๋ณ„๋„ ํŒŒ์ผ ``langgraph_checkpoints.db``๋กœ ์ƒ์„ฑํ•œ๋‹ค.
๋‘ DB๋ฅผ ๋ถ„๋ฆฌํ•จ์œผ๋กœ์จ ๊ด€์‹ฌ์‚ฌ(์„ธ์…˜ ๋ฉ”ํƒ€ vs. graph ์ฒดํฌํฌ์ธํŠธ)๋ฅผ ๋ช…ํ™•ํžˆ ๊ตฌ๋ถ„ํ•œ๋‹ค.
SqliteSaver๋Š” ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ํ›„์—๋„ interrupt ์ƒํƒœ๋ฅผ SQLite์—์„œ ๋ณต์›ํ•˜๋ฏ€๋กœ
MemorySaver์™€ ๋‹ฌ๋ฆฌ ์žฌ์‹œ์ž‘-์•ˆ์ „(restart-safe)ํ•˜๋‹ค.
Parameters
----------
session_db_path : str
SessionStore๊ฐ€ ์‚ฌ์šฉ ์ค‘์ธ sessions.sqlite3 ํŒŒ์ผ ๊ฒฝ๋กœ.
์ด ๊ฒฝ๋กœ์˜ ๋ถ€๋ชจ ๋””๋ ‰ํ„ฐ๋ฆฌ์— langgraph_checkpoints.db๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
Returns
-------
tuple[SqliteSaver | MemorySaver, sqlite3.Connection | None]
(checkpointer, conn) ํŠœํ”Œ.
SqliteSaver ์‚ฌ์šฉ ์‹œ conn์€ ์—ด๋ฆฐ sqlite3.Connection์ด๋ฉฐ,
ํ˜ธ์ถœ์ž๊ฐ€ ์ ์ ˆํ•œ ์‹œ์ ์— closeํ•ด์•ผ ํ•œ๋‹ค.
MemorySaver fallback ์‹œ conn์€ None์ด๋‹ค.
"""
cp_db_path = str(Path(session_db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite import SqliteSaver
conn = __import__("sqlite3").connect(cp_db_path, check_same_thread=False)
saver = SqliteSaver(conn)
logger.info(f"LangGraph checkpointer: SqliteSaver ({cp_db_path})")
return saver, conn
except ImportError:
logger.warning(
"langgraph-checkpoint-sqlite ๋ฏธ์„ค์น˜ โ€” MemorySaver๋กœ fallbackํ•ฉ๋‹ˆ๋‹ค. "
"ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ์‹œ interrupt ์ƒํƒœ๊ฐ€ ์†Œ๋ฉธ๋ฉ๋‹ˆ๋‹ค."
)
from langgraph.checkpoint.memory import MemorySaver
return MemorySaver(), None
manager = vLLMEngineManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI lifespan: ๋ชจ๋ธ/์ธ๋ฑ์Šค ์ดˆ๊ธฐํ™” ๋ฐ graph ๋‹จ์ผ ์ดˆ๊ธฐํ™”.
AsyncSqliteSaver๊ฐ€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋ฉด ๊ทธ๊ฒƒ์œผ๋กœ ํ•œ ๋ฒˆ๋งŒ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค.
import ์‹คํŒจ ์‹œ SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver๋กœ fallbackํ•œ๋‹ค.
graph ์ด์ค‘ ์ดˆ๊ธฐํ™”๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด _init_graph()๋ฅผ ํ•œ ๋ฒˆ๋งŒ ํ˜ธ์ถœํ•œ๋‹ค.
shutdown ์‹œ httpx AsyncClient๋ฅผ ๋ฐ˜๋“œ์‹œ ์ข…๋ฃŒํ•˜์—ฌ leak์„ ๋ฐฉ์ง€ํ•œ๋‹ค.
"""
await manager.initialize()
# API_KEY ๋ฏธ์„ค์ • ๊ฒฝ๊ณ  (์šด์˜ ํ”„๋กœํ•„์—์„œ ๋ช…์‹œ์ ์œผ๋กœ ์•Œ๋ฆผ)
if _API_KEY is None and runtime_config.profile.value not in ("local",):
logger.warning("API_KEY ๋ฏธ์„ค์ •: ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ฐ˜๋“œ์‹œ API_KEY ํ™˜๊ฒฝ๋ณ€์ˆ˜๋ฅผ ์„ค์ •ํ•˜์„ธ์š”.")
# checkpointer ๊ฒฐ์ • ํ›„ graph๋ฅผ ํ•œ ๋ฒˆ๋งŒ ์ดˆ๊ธฐํ™” (์ด์ค‘ ์ดˆ๊ธฐํ™” ๋ฐฉ์ง€)
async_cp_db = str(Path(manager.session_store.db_path).parent / "langgraph_checkpoints.db")
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string(async_cp_db) as async_saver:
manager._init_graph(checkpointer=async_saver)
manager._checkpointer_ctx = async_saver
logger.info(f"LangGraph: AsyncSqliteSaver ({async_cp_db})")
try:
yield
finally:
manager._checkpointer_ctx = None
if manager._http_client:
await manager._http_client.aclose()
logger.info("httpx AsyncClient ์ข…๋ฃŒ ์™„๋ฃŒ")
return
except ImportError:
pass
# fallback: SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver
manager._init_graph()
logger.info("LangGraph: SqliteSaver(๋™๊ธฐ) ๋˜๋Š” MemorySaver๋กœ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.")
try:
yield
finally:
if manager._http_client:
await manager._http_client.aclose()
logger.info("httpx AsyncClient ์ข…๋ฃŒ ์™„๋ฃŒ")
app = FastAPI(
title="GovOn Local Runtime",
description="Local FastAPI daemon for the GovOn Agentic Shell MVP.",
lifespan=lifespan,
)
ALLOWED_ORIGINS = os.getenv("CORS_ORIGINS", "").split(",")
if ALLOWED_ORIGINS and ALLOWED_ORIGINS[0]:
# wildcard(*)์™€ allow_credentials=True๋Š” CORS ์ŠคํŽ™์ƒ ๊ณต์กด ๋ถˆ๊ฐ€.
# wildcard๊ฐ€ ํฌํ•จ๋œ ๊ฒฝ์šฐ credentials๋ฅผ ๋น„ํ™œ์„ฑํ™”ํ•œ๋‹ค.
allow_creds = "*" not in ALLOWED_ORIGINS
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=allow_creds,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["X-API-Key", "Content-Type", "Accept"],
)
if _RATE_LIMIT_AVAILABLE and limiter is not None:
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
@app.get("/health")
async def health():
"""vLLM ์—ฐ๊ฒฐ ์ƒํƒœ๋ฅผ ํฌํ•จํ•œ ํ—ฌ์Šค ์ฒดํฌ.
๋‚ด๋ถ€ ๊ฒฝ๋กœ(session_store.path ๋“ฑ)๋Š” ๋ณด์•ˆ์ƒ ๋…ธ์ถœํ•˜์ง€ ์•Š๋Š”๋‹ค.
"""
vllm_ok = False
if manager._http_client:
try:
resp = await manager._http_client.get("/health", timeout=3.0)
vllm_ok = resp.status_code == 200
except Exception as exc:
logger.debug(f"vLLM health check ์‹คํŒจ: {exc}")
# SKIP_MODEL_LOAD ํ™˜๊ฒฝ์—์„œ๋Š” vLLM ์—†์ด๋„ healthy
status = "healthy" if (vllm_ok or SKIP_MODEL_LOAD) else "degraded"
return {
"status": status,
"profile": runtime_config.profile.value,
"model": runtime_config.model.model_path,
"vllm_connected": vllm_ok,
"agents_loaded": manager.agent_manager.list_agents() if manager.agent_manager else [],
"feature_flags": {
"model_version": manager.feature_flags.model_version,
},
"session_store": {
"driver": "sqlite",
},
}
def _rate_limit(limit_string: str):
if _RATE_LIMIT_AVAILABLE and limiter is not None:
return limiter.limit(limit_string)
def _noop(func):
return func
return _noop
def get_feature_flags(request: Request) -> FeatureFlags:
header = request.headers.get("X-Feature-Flag")
return manager.feature_flags.override_from_header(header)
# ---------------------------------------------------------------------------
# v2 ์—”๋“œํฌ์ธํŠธ: LangGraph ๊ธฐ๋ฐ˜ agent ์‹คํ–‰ (interrupt/approve ํŒจํ„ด)
# ---------------------------------------------------------------------------
@app.post("/v2/agent/stream")
@_rate_limit("30/minute")
async def v2_agent_stream(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph ๊ธฐ๋ฐ˜ agent SSE ์ŠคํŠธ๋ฆฌ๋ฐ ์‹คํ–‰.
graph.astream()์„ ์‚ฌ์šฉํ•ด ๋…ธ๋“œ๋ณ„ ์™„๋ฃŒ ์ด๋ฒคํŠธ๋ฅผ SSE๋กœ ์ „์†กํ•œ๋‹ค.
์ด๋ฒคํŠธ ํ˜•์‹ (๊ฐ ์ค„: ``data: <JSON>\\n\\n``):
- ๋…ธ๋“œ ์ง„ํ–‰: ``{"node": "<name>", "status": "completed", ...}``
- approval_wait ๋„๋‹ฌ:
``{"node": "approval_wait", "status": "awaiting_approval",
"approval_request": {...}, "thread_id": "..."}``
- ์˜ค๋ฅ˜: ``{"node": "error", "status": "error", "error": "..."}``
์Šน์ธ ํ๋ฆ„:
- ํด๋ผ์ด์–ธํŠธ๋Š” ``awaiting_approval`` ์ด๋ฒคํŠธ ์ˆ˜์‹  ํ›„ ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋จ์„ ์ธ์ง€ํ•˜๊ณ 
``/v2/agent/approve``๋กœ ์Šน์ธ/๊ฑฐ์ ˆ์„ ์ „๋‹ฌํ•œ๋‹ค.
"""
if not manager.graph:
async def _no_graph():
yield 'data: {"node": "error", "status": "error", "error": "LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค."}\n\n'
return StreamingResponse(_no_graph(), media_type="text/event-stream")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# ๊ธฐ์กด interrupt ์ƒํƒœ๊ฐ€ ๋‚จ์•„์žˆ์œผ๋ฉด ๊ฑฐ์ ˆ(cancel)๋กœ ํ•ด์†Œ
try:
from langgraph.types import Command
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt ์ƒํƒœ ํ™•์ธ/ํ•ด์†Œ ์‹คํŒจ (๋ฌด์‹œ): {type(clear_exc).__name__}")
async def _generate() -> AsyncGenerator[str, None]:
try:
async for chunk in manager.graph.astream(initial_state, config, stream_mode="updates"):
# chunk: {node_name: state_delta}
for node_name, state_delta in chunk.items():
event: dict = {
"node": node_name,
"status": "completed",
}
# agent ๋…ธ๋“œ ์™„๋ฃŒ ์‹œ tool_calls ์ •๋ณด๋ฅผ ์ด๋ฒคํŠธ์— ํฌํ•จ
if node_name == "agent" and isinstance(state_delta, dict):
msgs = state_delta.get("messages", [])
if msgs:
last_msg = msgs[-1] if isinstance(msgs, list) else msgs
tc = getattr(last_msg, "tool_calls", None)
if tc:
event["planned_tools"] = [t["name"] for t in tc]
# persist ์™„๋ฃŒ ์‹œ evidence_items๋ฅผ ์ด๋ฒคํŠธ์— ํฌํ•จ.
# ์ „์ œ: stream_mode="updates"์—์„œ state_delta๋Š” ๋…ธ๋“œ์˜ raw return dict๋‹ค.
# evidence_items ์Šคํ‚ค๋งˆ: EvidenceItem.to_dict() ํ•„๋“œ๋ฅผ ๋”ฐ๋ฅธ๋‹ค.
# source_type: "api" | "llm_generated"
# title, excerpt, link_or_path, page, score, provider_meta
if node_name == "persist" and isinstance(state_delta, dict):
if state_delta.get("final_text"):
event["final_text"] = state_delta["final_text"]
if state_delta.get("evidence_items"):
event["evidence_items"] = state_delta["evidence_items"]
# approval_wait: ๋ช…์‹œ์  ๋…ธ๋“œ๋ช… ๋˜๋Š” LangGraph interrupt() ํ˜ธ์ถœ ์‹œ
# stream_mode="updates"์—์„œ emit๋˜๋Š” "__interrupt__" ์ฒญํฌ ๋ชจ๋‘ ์ฒ˜๋ฆฌ
if node_name in ("approval_wait", "__interrupt__"):
try:
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
event = {
"node": "approval_wait",
"status": "awaiting_approval",
"approval_request": _extract_approval_request(graph_state),
"thread_id": thread_id,
"session_id": session_id,
}
except Exception as exc:
logger.warning(f"[v2/agent/stream] aget_state ์‹คํŒจ: {exc}")
event["node"] = "approval_wait"
event["status"] = "awaiting_approval"
event["thread_id"] = thread_id
event["session_id"] = session_id
event["approval_request"] = {
"prompt": "์Šน์ธ ์ •๋ณด๋ฅผ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. /v2/agent/approve๋กœ ์ง„ํ–‰ํ•˜์„ธ์š”."
}
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
# Stop streaming after awaiting_approval (client must call /v2/agent/approve)
if event.get("status") == "awaiting_approval":
return
except Exception as exc:
logger.error(f"[v2/agent/stream] ์ŠคํŠธ๋ฆผ ์˜ˆ์™ธ: {exc}")
error_event = {"node": "error", "status": "error", "error": str(exc)}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(_generate(), media_type="text/event-stream")
@app.post("/v2/agent/run")
@_rate_limit("30/minute")
async def v2_agent_run(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""LangGraph ๊ธฐ๋ฐ˜ agent ์‹คํ–‰ (1๋‹จ๊ณ„: interrupt๊นŒ์ง€).
graph๋ฅผ ์‹คํ–‰ํ•˜์—ฌ `approval_wait` ๋…ธ๋“œ์—์„œ interrupt๋˜๋ฉด
`status: awaiting_approval`๊ณผ ํ•จ๊ป˜ ์Šน์ธ ์š”์ฒญ ์ •๋ณด๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
ํด๋ผ์ด์–ธํŠธ๋Š” ๋ฐ˜ํ™˜๋œ `thread_id`๋ฅผ ์ €์žฅํ•ด๋‘๊ณ 
`/v2/agent/approve`๋กœ ์Šน์ธ/๊ฑฐ์ ˆ์„ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.
Session Resume Contract
-----------------------
๋™์ผ session_id๋กœ ์žฌ์š”์ฒญํ•˜๋Š” ๊ฒฝ์šฐ ๋‹ค์Œ ๊ทœ์น™์„ ๋”ฐ๋ฅธ๋‹ค:
1. **interrupt ๋Œ€๊ธฐ ์ค‘**: graph๊ฐ€ approval_wait์—์„œ interrupt ์ƒํƒœ์ด๋ฉด
ํ˜„์žฌ checkpoint์—์„œ resumeํ•˜์ง€ ์•Š๊ณ  ์ƒˆ ๋ฉ”์‹œ์ง€๋ฅผ *์ถ”๊ฐ€ํ•˜์—ฌ* ์ด์–ด์„œ ์‹คํ–‰ํ•œ๋‹ค.
(์žฌ์š”์ฒญ์€ ์ƒˆ graph_run์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.)
์Šน์ธ/๊ฑฐ์ ˆ์€ ๋ฐ˜๋“œ์‹œ `/v2/agent/approve`๋ฅผ ํ†ตํ•ด ์ฒ˜๋ฆฌํ•ด์•ผ ํ•œ๋‹ค.
2. **์™„๋ฃŒ๋œ graph**: graph๊ฐ€ END์— ๋„๋‹ฌํ•œ ์ƒํƒœ(state.next == [])์ด๋ฉด
๋™์ผ thread_id์— ์ƒˆ graph_run์„ ์‹œ์ž‘ํ•œ๋‹ค. LangGraph checkpointer๊ฐ€
๋™์ผ thread_id์—์„œ ์ด์ „ ์ƒํƒœ๋ฅผ ๋ˆ„์ ํ•˜๋ฏ€๋กœ ๋Œ€ํ™” ํžˆ์Šคํ† ๋ฆฌ๊ฐ€ ๋ณด์กด๋œ๋‹ค.
3. **ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ํ›„**: SqliteSaver ์‚ฌ์šฉ ์‹œ DB์—์„œ checkpoint๊ฐ€ ๋ณต์›๋˜๋ฏ€๋กœ
interrupt ์ƒํƒœ๊ฐ€ ์œ ์ง€๋œ๋‹ค. ํด๋ผ์ด์–ธํŠธ๋Š” ๊ธฐ์กด thread_id๋กœ `/v2/agent/approve`
๋ฅผ ๋‹ค์‹œ ํ˜ธ์ถœํ•˜๋ฉด ์ค‘๋‹จ๋œ ์ง€์ ์—์„œ resumeํ•  ์ˆ˜ ์žˆ๋‹ค.
Note: session_id == thread_id. ๋‘ ๊ฐ’์€ ํ•ญ์ƒ ๋™์ผํ•˜๊ฒŒ ์œ ์ง€๋œ๋‹ค.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langchain_core.messages import HumanMessage
thread_id = request.session_id or str(uuid.uuid4())
session_id = thread_id # thread_id๋ฅผ session_id๋กœ ํ™•์ • (session_id == thread_id ๋ถˆ๋ณ€)
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
}
# ๊ธฐ์กด interrupt ์ƒํƒœ๊ฐ€ ๋‚จ์•„์žˆ์œผ๋ฉด ๊ฑฐ์ ˆ(cancel)๋กœ ํ•ด์†Œ
try:
existing_state = await manager.graph.aget_state(config)
if existing_state and existing_state.next:
from langgraph.types import Command
await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
except Exception as clear_exc:
logger.warning(f"[v2] interrupt ์ƒํƒœ ํ™•์ธ/ํ•ด์†Œ ์‹คํŒจ (๋ฌด์‹œ): {type(clear_exc).__name__}")
try:
await manager.graph.ainvoke(initial_state, config)
# interrupt ์ƒํƒœ ํ™•์ธ
graph_state = await manager.graph.aget_state(config)
if graph_state.next:
# interrupt ๋Œ€๊ธฐ ์ค‘: approval_request ์ •๋ณด๋ฅผ ํด๋ผ์ด์–ธํŠธ์— ๋ฐ˜ํ™˜
return {
"status": "awaiting_approval",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"approval_request": _extract_approval_request(graph_state),
}
# interrupt ์—†์ด ์™„๋ฃŒ๋œ ๊ฒฝ์šฐ (rejected ๋˜๋Š” ์˜ค๋ฅ˜)
final_state = graph_state.values
return {
"status": "completed",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"text": final_state.get("final_text", ""),
"evidence_items": final_state.get("evidence_items", []),
}
except Exception as exc:
logger.error(f"[v2/agent/run] ์˜ˆ์™ธ ๋ฐœ์ƒ: {exc}")
# graph_run์„ "error" status๋กœ ๊ธฐ๋ก ์‹œ๋„
try:
if manager.session_store:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/run] error persist ์‹คํŒจ: {persist_exc}")
logger.exception(f"[v2/agent/run] ์š”์ฒญ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์š”์ฒญ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
@app.post("/v2/agent/approve")
@_rate_limit("30/minute")
async def v2_agent_approve(
thread_id: str,
approved: bool,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt๋œ graph๋ฅผ resumeํ•œ๋‹ค (2๋‹จ๊ณ„: ์Šน์ธ/๊ฑฐ์ ˆ).
Parameters
----------
thread_id : str
`/v2/agent/run`์—์„œ ๋ฐ˜ํ™˜๋œ thread_id.
approved : bool
True๋ฉด tool_execute๋กœ ์ง„ํ–‰, False๋ฉด graph๊ฐ€ END๋กœ ์ข…๋ฃŒ.
TODO(M7): thread_id, approved ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ query string์—์„œ request body๋กœ ์ด๋™ํ•˜๋Š” ๊ฒƒ์ด
REST ๊ด€๋ก€์— ๋ถ€ํ•ฉํ•˜๋‚˜, ๊ธฐ์กด ํด๋ผ์ด์–ธํŠธ ํ˜ธํ™˜์„ฑ์„ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ํ˜„์žฌ ๋ฐฉ์‹์„ ์œ ์ง€ํ•œ๋‹ค.
ํด๋ผ์ด์–ธํŠธ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์ดํ›„ body ๋ฐฉ์‹์œผ๋กœ ์ „ํ™˜ํ•œ๋‹ค.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
result = await manager.graph.ainvoke(
Command(resume={"approved": approved}),
config,
)
# ๊ฑฐ์ ˆ์ด๋ฉด "rejected", ์Šน์ธ ์™„๋ฃŒ๋ฉด "completed"
approval_status = result.get("approval_status", "")
if not approved:
response_status = "rejected"
else:
response_status = "completed"
return {
"status": response_status,
"thread_id": thread_id,
"session_id": result.get("session_id", ""),
"graph_run_id": result.get("request_id", ""),
"text": result.get("final_text", ""),
"evidence_items": result.get("evidence_items", []),
"approval_status": approval_status,
}
except Exception as exc:
logger.error(f"[v2/agent/approve] ์˜ˆ์™ธ ๋ฐœ์ƒ: {exc}")
# graph_run์„ "error" status๋กœ ๊ธฐ๋ก ์‹œ๋„
session_id = ""
request_id = ""
try:
if manager.session_store:
graph_state = await manager.graph.aget_state(config)
state_values = graph_state.values if graph_state else {}
session_id = state_values.get("session_id", "")
request_id = state_values.get("request_id", "")
if session_id:
session = manager.session_store.get_or_create(session_id)
session.add_graph_run(
request_id=request_id,
plan_summary=f"[error] {exc}",
approval_status="",
executed_capabilities=[],
status="error",
total_latency_ms=0.0,
)
except Exception as persist_exc:
logger.warning(f"[v2/agent/approve] error persist ์‹คํŒจ: {persist_exc}")
logger.exception(f"[v2/agent/approve] ์Šน์ธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์Šน์ธ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
@app.post("/v2/agent/cancel")
@_rate_limit("30/minute")
async def v2_agent_cancel(
thread_id: str,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""interrupt ๋Œ€๊ธฐ ์ค‘์ธ graph๋ฅผ ๊ฐ•์ œ ์ทจ์†Œํ•œ๋‹ค.
interrupt ์ƒํƒœ์—์„œ ๊ฑฐ์ ˆ ์ฒ˜๋ฆฌ(Command(resume={"approved": False}))๋ฅผ ์ˆ˜ํ–‰ํ•˜๋˜,
state์— interrupt_reason="user_cancel"์„ ์ „๋‹ฌํ•˜์—ฌ
persist ๋…ธ๋“œ๊ฐ€ graph_run status๋ฅผ "interrupted"๋กœ ๊ธฐ๋กํ•˜๊ฒŒ ํ•œ๋‹ค.
Parameters
----------
thread_id : str
`/v2/agent/run`์—์„œ ๋ฐ˜ํ™˜๋œ thread_id.
"""
if not manager.graph:
raise HTTPException(status_code=503, detail="LangGraph graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langgraph.types import Command
config = {"configurable": {"thread_id": thread_id}}
try:
# interrupt ์ƒํƒœ ํ™•์ธ
graph_state = await manager.graph.aget_state(config)
if not graph_state or not graph_state.next:
raise HTTPException(
status_code=409,
detail="ํ•ด๋‹น thread๋Š” ํ˜„์žฌ interrupt ๋Œ€๊ธฐ ์ƒํƒœ๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค.",
)
session_id = graph_state.values.get("session_id", "")
# ๊ฐ•์ œ ๊ฑฐ์ ˆ + interrupt_reason ์ „๋‹ฌ๋กœ resume
result = await manager.graph.ainvoke(
Command(resume={"approved": False, "cancel": True}),
config,
)
# persist ๋…ธ๋“œ์—์„œ "interrupted" ๊ธฐ๋ก์„ ์œ„ํ•ด state update
# (approval_wait_node๊ฐ€ cancel ์‹ ํ˜ธ๋ฅผ interrupt_reason์œผ๋กœ ๋ณ€ํ™˜)
return {
"status": "cancelled",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": result.get("request_id", ""),
}
except HTTPException:
raise
except Exception as exc:
logger.exception(f"[v2/agent/cancel] ์ทจ์†Œ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"error": "์ทจ์†Œ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
# ---------------------------------------------------------------------------
# v3 ์—”๋“œํฌ์ธํŠธ: ReAct ์ž์œจ ๋ฃจํ”„ + ์„ธ๋ฐ€ํ•œ SSE ์ŠคํŠธ๋ฆฌ๋ฐ
# ---------------------------------------------------------------------------
@app.post("/v3/agent/stream", response_model=None)
@_rate_limit("30/minute")
async def v3_agent_stream(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
) -> StreamingResponse:
"""v3 ReAct agent โ€” astream_events ๊ธฐ๋ฐ˜ ์„ธ๋ฐ€ํ•œ SSE ์ŠคํŠธ๋ฆฌ๋ฐ (๋Œ€ํ™”ํ˜• ๋ฉ€ํ‹ฐํ„ด ์ง€์›).
๊ฐ™์€ session_id๋กœ ์žฌ์š”์ฒญํ•˜๋ฉด LangGraph checkpointer๊ฐ€ ์ด์ „ ๋Œ€ํ™”๋ฅผ ์ž๋™ ๋ณต์›ํ•œ๋‹ค.
์ด๋ฒคํŠธ ํƒ€์ž…:
- thinking_start: LLM ์ถ”๋ก  ์‹œ์ž‘
- thinking_delta: LLM ํ† ํฐ ์ŠคํŠธ๋ฆฌ๋ฐ
- thinking_end: LLM ์ถ”๋ก  ์™„๋ฃŒ (tool_calls ํฌํ•จ)
- tool_start: ๋„๊ตฌ ์‹คํ–‰ ์‹œ์ž‘
- tool_end: ๋„๊ตฌ ์‹คํ–‰ ์™„๋ฃŒ
- run_complete: ์ „์ฒด ์‹คํ–‰ ์™„๋ฃŒ (๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํฌํ•จ)
"""
if not manager.graph_v3:
raise HTTPException(status_code=503, detail="v3 graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langchain_core.messages import HumanMessage
# session_id = thread_id ํ†ต์ผ: ๊ฐ™์€ session_id๋ฉด ์ด์ „ ๋Œ€ํ™” ์ž๋™ ๋ณต์›
# v3: prefix๋กœ v2 checkpointer์™€ ๊ฒฉ๋ฆฌ
session_id = request.session_id or str(uuid.uuid4())
thread_id = f"v3:{session_id}"
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# checkpointer์—์„œ ๊ธฐ์กด ๋Œ€ํ™” ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ (C2: graceful degradation)
try:
existing_state = await manager.graph_v3.aget_state(config)
has_history = (
existing_state and existing_state.values and existing_state.values.get("messages")
)
except (KeyError, ValueError):
has_history = False
except Exception as exc:
logger.error(f"[v3/agent/stream] checkpointer ์ €์žฅ์†Œ ์˜ค๋ฅ˜: {exc}")
raise HTTPException(status_code=500, detail="์„ธ์…˜ ์ €์žฅ์†Œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.")
if has_history:
invoke_input = {
"messages": [HumanMessage(content=request.query)],
"request_id": request_id,
"max_iterations": request.max_iterations,
"iteration_count": 0,
"pending_tool_calls": [],
}
else:
invoke_input = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
"max_iterations": request.max_iterations,
"iteration_count": 0,
"tool_call_history": [],
"pending_tool_calls": [],
}
async def _generate_v3() -> AsyncGenerator[str, None]:
run_t0 = time.monotonic()
iteration = 0
try:
async for event in manager.graph_v3.astream_events(invoke_input, config, version="v2"):
if await _http_request.is_disconnected():
logger.info("[v3/agent/stream] ํด๋ผ์ด์–ธํŠธ ์—ฐ๊ฒฐ ๋Š๊น€ ๊ฐ์ง€ โ€” ์ŠคํŠธ๋ฆผ ์กฐ๊ธฐ ์ข…๋ฃŒ")
return
kind = event["event"]
if kind == "on_chat_model_start":
sse_event = {"type": "thinking_start", "iteration": iteration}
yield f"data: {json.dumps(sse_event, ensure_ascii=False)}\n\n"
elif kind == "on_chat_model_stream":
chunk = event.get("data", {}).get("chunk")
if chunk:
content = getattr(chunk, "content", "")
if content:
sse_event = {
"type": "thinking_delta",
"content": content,
}
yield f"data: {json.dumps(sse_event, ensure_ascii=False)}\n\n"
elif kind == "on_chat_model_end":
output = event.get("data", {}).get("output")
tool_calls = []
if output:
raw_calls = getattr(output, "tool_calls", None) or []
tool_calls = [
{
"name": tc.get("name", ""),
"args": tc.get("args", {}),
}
for tc in raw_calls
]
sse_event = {
"type": "thinking_end",
"tool_calls": tool_calls,
"iteration": iteration,
}
yield f"data: {json.dumps(sse_event, ensure_ascii=False)}\n\n"
if tool_calls:
iteration += 1
elif kind == "on_tool_start":
tool_name = event.get("name", "")
sse_event = {"type": "tool_start", "tool": tool_name}
yield f"data: {json.dumps(sse_event, ensure_ascii=False)}\n\n"
elif kind == "on_tool_end":
tool_name = event.get("name", "")
tool_output = event.get("data", {}).get("output")
sse_event: dict = {"type": "tool_end", "tool": tool_name}
if tool_output is not None:
output_status = getattr(tool_output, "status", None)
if isinstance(tool_output, Exception):
sse_event["success"] = False
elif output_status == "error":
sse_event["success"] = False
else:
sse_event["success"] = True
yield f"data: {json.dumps(sse_event, ensure_ascii=False)}\n\n"
# ์‹คํ–‰ ์™„๋ฃŒ โ€” ์ตœ์ข… state์—์„œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ถ”์ถœ
total_latency = round((time.monotonic() - run_t0) * 1000, 2)
try:
final_state = await manager.graph_v3.aget_state(config)
state_values = final_state.values if final_state else {}
except Exception:
state_values = {}
complete_event = {
"type": "run_complete",
"thread_id": thread_id,
"session_id": session_id,
"text": state_values.get("final_text", ""),
"evidence_items": state_values.get("evidence_items", []),
"metadata": {
"total_iterations": state_values.get("iteration_count", 0),
"total_tool_calls": len(state_values.get("tool_call_history", [])),
"total_latency_ms": total_latency,
"node_latencies": state_values.get("node_latencies", {}),
},
}
yield f"data: {json.dumps(complete_event, ensure_ascii=False)}\n\n"
except Exception as exc:
logger.exception(f"[v3/agent/stream] ์ŠคํŠธ๋ฆผ ์˜ˆ์™ธ: {exc}")
error_event = {
"type": "error",
"error": "์š”์ฒญ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(_generate_v3(), media_type="text/event-stream")
@app.post("/v3/agent/run", response_model=None)
@_rate_limit("30/minute")
async def v3_agent_run(
request: AgentRunRequest,
_http_request: Request,
_: None = Depends(verify_api_key),
):
"""v3 ReAct agent ๋ธ”๋กœํ‚น ์‹คํ–‰ (๋Œ€ํ™”ํ˜• ๋ฉ€ํ‹ฐํ„ด ์ง€์›).
๊ฐ™์€ session_id๋กœ ์žฌ์š”์ฒญํ•˜๋ฉด LangGraph checkpointer๊ฐ€ ์ด์ „ ๋Œ€ํ™”๋ฅผ ์ž๋™ ๋ณต์›ํ•œ๋‹ค.
add_messages reducer๊ฐ€ ์ƒˆ HumanMessage๋ฅผ ๊ธฐ์กด messages์— ๋ˆ„์ ํ•œ๋‹ค.
๋ชจ๋“  ๋„๊ตฌ๊ฐ€ ์ž๋™ ์‹คํ–‰๋˜๋ฏ€๋กœ approval_wait๊ฐ€ ์—†๋‹ค.
"""
if not manager.graph_v3:
raise HTTPException(status_code=503, detail="v3 graph๊ฐ€ ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
from langchain_core.messages import HumanMessage
# session_id = thread_id ํ†ต์ผ: ๊ฐ™์€ session_id๋ฉด ์ด์ „ ๋Œ€ํ™” ์ž๋™ ๋ณต์›
# v3: prefix๋กœ v2 checkpointer์™€ ๊ฒฉ๋ฆฌ
session_id = request.session_id or str(uuid.uuid4())
thread_id = f"v3:{session_id}"
request_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
# session_id ๋‹จ์œ„ ๋ฝ: aget_state ~ ainvoke ์ „์ฒด๋ฅผ ์›์ž์ ์œผ๋กœ ์‹คํ–‰
async with manager.get_session_lock(thread_id):
# checkpointer์—์„œ ๊ธฐ์กด ๋Œ€ํ™” ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ
# ๋นˆ ์ƒํƒœ(KeyError ๋“ฑ)๋Š” ์‹ ๊ทœ ์„ธ์…˜์œผ๋กœ ์ง„ํ–‰, ์ €์žฅ์†Œ ์˜ค๋ฅ˜๋Š” 500 ๋ฐ˜ํ™˜
try:
existing_state = await manager.graph_v3.aget_state(config)
has_history = (
existing_state and existing_state.values and existing_state.values.get("messages")
)
except (KeyError, ValueError):
# ์ฒดํฌํฌ์ธํŠธ ๋ฏธ์กด์žฌ ๋˜๋Š” ๋นˆ ์ƒํƒœ โ€” ์‹ ๊ทœ ์„ธ์…˜์œผ๋กœ ์ง„ํ–‰
has_history = False
except Exception as exc:
logger.error(f"[v3/agent/run] checkpointer ์ €์žฅ์†Œ ์˜ค๋ฅ˜: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์„ธ์…˜ ์ €์žฅ์†Œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
if has_history:
invoke_input = {
"messages": [HumanMessage(content=request.query)],
"request_id": request_id,
"max_iterations": request.max_iterations,
"iteration_count": 0,
"pending_tool_calls": [],
}
else:
invoke_input = {
"session_id": session_id,
"request_id": request_id,
"messages": [HumanMessage(content=request.query)],
"max_iterations": request.max_iterations,
"iteration_count": 0,
"tool_call_history": [],
"pending_tool_calls": [],
}
try:
t0 = time.monotonic()
result = await manager.graph_v3.ainvoke(invoke_input, config)
total_latency = round((time.monotonic() - t0) * 1000, 2)
return {
"status": "completed",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"text": result.get("final_text", ""),
"evidence_items": result.get("evidence_items", []),
"metadata": {
"total_iterations": result.get("iteration_count", 0),
"total_tool_calls": len(result.get("tool_call_history", [])),
"total_messages": len(result.get("messages", [])),
"total_latency_ms": total_latency,
"node_latencies": result.get("node_latencies", {}),
},
}
except Exception as exc:
logger.exception(f"[v3/agent/run] ์š”์ฒญ ์ฒ˜๋ฆฌ ์‹คํŒจ: {exc}")
return JSONResponse(
status_code=500,
content={
"status": "error",
"thread_id": thread_id,
"session_id": session_id,
"graph_run_id": request_id,
"error": "์š”์ฒญ ์ฒ˜๋ฆฌ ์ค‘ ๋‚ด๋ถ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, **runtime_config.to_uvicorn_kwargs())