veeiiinnnnn's picture
Add backend-python and Dockerfile
4ef118d
"""
Shared helpers for non-streaming LLM calls and JSON parsing.
"""
from __future__ import annotations
import ast
import json
import logging
from typing import Any
from ..models.stream_chat import StreamChatRequest
from ..providers import ExecutionContext, get_provider_adapter
from ..services.stream_chat import get_stream_chat_service
logger = logging.getLogger(__name__)
def normalize_text_content(content: Any) -> str:
"""Normalize mixed content into a plain string."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
if "text" in item:
parts.append(str(item.get("text", "")))
elif item.get("type") == "text":
parts.append(str(item.get("text", "")))
return " ".join([p for p in parts if p]).strip()
if isinstance(content, dict):
try:
return json.dumps(content, ensure_ascii=True)
except Exception:
return str(content)
return str(content)
def safe_json_parse(text: str | None) -> Any | None:
"""Best-effort JSON parse with cleanup fallback."""
if not text or not isinstance(text, str):
return None
stripped = text.strip()
if not stripped:
return None
try:
return json.loads(stripped)
except Exception:
pass
# Fallback: extract first JSON object/array substring
try:
obj_start = stripped.find("{")
obj_end = stripped.rfind("}")
if obj_start != -1 and obj_end != -1 and obj_end > obj_start:
return json.loads(stripped[obj_start : obj_end + 1])
except Exception:
pass
try:
arr_start = stripped.find("[")
arr_end = stripped.rfind("]")
if arr_start != -1 and arr_end != -1 and arr_end > arr_start:
return json.loads(stripped[arr_start : arr_end + 1])
except Exception:
pass
# Final fallback: attempt Python literal (handles single quotes)
try:
if stripped.startswith("{") or stripped.startswith("["):
value = ast.literal_eval(stripped)
if isinstance(value, (dict, list)):
return value
except Exception:
pass
return None
async def run_chat_completion(
*,
provider: str,
api_key: str,
messages: list[dict[str, Any]],
base_url: str | None = None,
model: str | None = None,
response_format: dict[str, Any] | None = None,
thinking: dict[str, Any] | bool | None = None,
temperature: float | None = None,
top_k: int | None = None,
top_p: float | None = None,
frequency_penalty: float | None = None,
presence_penalty: float | None = None,
context_message_limit: int | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
) -> dict[str, str]:
"""Run a streaming LLM call and return full content/thought."""
adapter = get_provider_adapter(provider)
trimmed = adapter.apply_context_limit(messages, context_message_limit)
context = ExecutionContext(
messages=trimmed,
tools=tools,
tool_choice=tool_choice,
temperature=temperature,
top_p=top_p,
top_k=top_k,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
response_format=response_format,
thinking=thinking,
stream=True,
)
full_text = ""
full_thought = ""
async for chunk in adapter.execute(
context=context,
api_key=api_key,
model=model,
base_url=base_url,
):
if chunk.type == "text":
full_text += chunk.content or ""
elif chunk.type == "thought":
full_thought += chunk.thought or ""
elif chunk.type == "error":
raise ValueError(chunk.error or "Unknown error")
elif chunk.type == "done":
break
return {"content": full_text, "thought": full_thought}
async def run_agent_completion(request: StreamChatRequest) -> dict[str, Any]:
"""Run the Agno Agent via stream_chat service and return final content/thought/sources."""
service = get_stream_chat_service()
async def _collect(req: StreamChatRequest) -> dict[str, Any]:
full_text = ""
full_thought = ""
sources: list[dict[str, Any]] = []
output: Any = None
async for event in service.stream_chat(req):
event_type = event.get("type")
if event_type == "text":
full_text += event.get("content", "")
elif event_type == "thought":
full_thought += event.get("content", "")
elif event_type == "done":
sources = event.get("sources") or []
output = event.get("output")
elif event_type == "error":
raise ValueError(event.get("error") or "Unknown error")
return {
"content": full_text,
"thought": full_thought,
"sources": sources,
"output": output,
}
result = await _collect(request)
is_structured_request = bool(request.output_schema is not None or request.response_format is not None)
if not is_structured_request:
return result
has_structured_output = result.get("output") is not None
has_json_like_content = safe_json_parse(result.get("content", "")) is not None
has_json_like_thought = safe_json_parse(result.get("thought", "")) is not None
should_retry = not has_structured_output and not has_json_like_content and not has_json_like_thought
if not should_retry:
return result
retry_request = request.model_copy(deep=True)
retry_request.response_format = None
retry_request.output_schema = None
retry_messages = list(retry_request.messages or [])
if retry_messages:
first = retry_messages[0]
if isinstance(first, dict) and first.get("role") == "system":
retry_messages[0] = {
**first,
"content": (
f"{first.get('content', '')}\n\n"
"CRITICAL: Return ONLY a valid JSON object/array. "
"Do not include markdown code fences or extra explanation."
),
}
else:
retry_messages.insert(
0,
{
"role": "system",
"content": (
"CRITICAL: Return ONLY a valid JSON object/array. "
"Do not include markdown code fences or extra explanation."
),
},
)
retry_request.messages = retry_messages
logger.warning("Structured output parse failed; retrying once with strict JSON-only prompt.")
try:
return await _collect(retry_request)
except Exception as exc:
logger.warning("Structured retry failed; returning first-pass result. error=%s", exc)
return result