Spaces:
Running
Running
| """ | |
| Shared helpers for non-streaming LLM calls and JSON parsing. | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import json | |
| import logging | |
| from typing import Any | |
| from ..models.stream_chat import StreamChatRequest | |
| from ..providers import ExecutionContext, get_provider_adapter | |
| from ..services.stream_chat import get_stream_chat_service | |
| logger = logging.getLogger(__name__) | |
| def normalize_text_content(content: Any) -> str: | |
| """Normalize mixed content into a plain string.""" | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, str): | |
| parts.append(item) | |
| elif isinstance(item, dict): | |
| if "text" in item: | |
| parts.append(str(item.get("text", ""))) | |
| elif item.get("type") == "text": | |
| parts.append(str(item.get("text", ""))) | |
| return " ".join([p for p in parts if p]).strip() | |
| if isinstance(content, dict): | |
| try: | |
| return json.dumps(content, ensure_ascii=True) | |
| except Exception: | |
| return str(content) | |
| return str(content) | |
| def safe_json_parse(text: str | None) -> Any | None: | |
| """Best-effort JSON parse with cleanup fallback.""" | |
| if not text or not isinstance(text, str): | |
| return None | |
| stripped = text.strip() | |
| if not stripped: | |
| return None | |
| try: | |
| return json.loads(stripped) | |
| except Exception: | |
| pass | |
| # Fallback: extract first JSON object/array substring | |
| try: | |
| obj_start = stripped.find("{") | |
| obj_end = stripped.rfind("}") | |
| if obj_start != -1 and obj_end != -1 and obj_end > obj_start: | |
| return json.loads(stripped[obj_start : obj_end + 1]) | |
| except Exception: | |
| pass | |
| try: | |
| arr_start = stripped.find("[") | |
| arr_end = stripped.rfind("]") | |
| if arr_start != -1 and arr_end != -1 and arr_end > arr_start: | |
| return json.loads(stripped[arr_start : arr_end + 1]) | |
| except Exception: | |
| pass | |
| # Final fallback: attempt Python literal (handles single quotes) | |
| try: | |
| if stripped.startswith("{") or stripped.startswith("["): | |
| value = ast.literal_eval(stripped) | |
| if isinstance(value, (dict, list)): | |
| return value | |
| except Exception: | |
| pass | |
| return None | |
| async def run_chat_completion( | |
| *, | |
| provider: str, | |
| api_key: str, | |
| messages: list[dict[str, Any]], | |
| base_url: str | None = None, | |
| model: str | None = None, | |
| response_format: dict[str, Any] | None = None, | |
| thinking: dict[str, Any] | bool | None = None, | |
| temperature: float | None = None, | |
| top_k: int | None = None, | |
| top_p: float | None = None, | |
| frequency_penalty: float | None = None, | |
| presence_penalty: float | None = None, | |
| context_message_limit: int | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| ) -> dict[str, str]: | |
| """Run a streaming LLM call and return full content/thought.""" | |
| adapter = get_provider_adapter(provider) | |
| trimmed = adapter.apply_context_limit(messages, context_message_limit) | |
| context = ExecutionContext( | |
| messages=trimmed, | |
| tools=tools, | |
| tool_choice=tool_choice, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k, | |
| frequency_penalty=frequency_penalty, | |
| presence_penalty=presence_penalty, | |
| response_format=response_format, | |
| thinking=thinking, | |
| stream=True, | |
| ) | |
| full_text = "" | |
| full_thought = "" | |
| async for chunk in adapter.execute( | |
| context=context, | |
| api_key=api_key, | |
| model=model, | |
| base_url=base_url, | |
| ): | |
| if chunk.type == "text": | |
| full_text += chunk.content or "" | |
| elif chunk.type == "thought": | |
| full_thought += chunk.thought or "" | |
| elif chunk.type == "error": | |
| raise ValueError(chunk.error or "Unknown error") | |
| elif chunk.type == "done": | |
| break | |
| return {"content": full_text, "thought": full_thought} | |
| async def run_agent_completion(request: StreamChatRequest) -> dict[str, Any]: | |
| """Run the Agno Agent via stream_chat service and return final content/thought/sources.""" | |
| service = get_stream_chat_service() | |
| async def _collect(req: StreamChatRequest) -> dict[str, Any]: | |
| full_text = "" | |
| full_thought = "" | |
| sources: list[dict[str, Any]] = [] | |
| output: Any = None | |
| async for event in service.stream_chat(req): | |
| event_type = event.get("type") | |
| if event_type == "text": | |
| full_text += event.get("content", "") | |
| elif event_type == "thought": | |
| full_thought += event.get("content", "") | |
| elif event_type == "done": | |
| sources = event.get("sources") or [] | |
| output = event.get("output") | |
| elif event_type == "error": | |
| raise ValueError(event.get("error") or "Unknown error") | |
| return { | |
| "content": full_text, | |
| "thought": full_thought, | |
| "sources": sources, | |
| "output": output, | |
| } | |
| result = await _collect(request) | |
| is_structured_request = bool(request.output_schema is not None or request.response_format is not None) | |
| if not is_structured_request: | |
| return result | |
| has_structured_output = result.get("output") is not None | |
| has_json_like_content = safe_json_parse(result.get("content", "")) is not None | |
| has_json_like_thought = safe_json_parse(result.get("thought", "")) is not None | |
| should_retry = not has_structured_output and not has_json_like_content and not has_json_like_thought | |
| if not should_retry: | |
| return result | |
| retry_request = request.model_copy(deep=True) | |
| retry_request.response_format = None | |
| retry_request.output_schema = None | |
| retry_messages = list(retry_request.messages or []) | |
| if retry_messages: | |
| first = retry_messages[0] | |
| if isinstance(first, dict) and first.get("role") == "system": | |
| retry_messages[0] = { | |
| **first, | |
| "content": ( | |
| f"{first.get('content', '')}\n\n" | |
| "CRITICAL: Return ONLY a valid JSON object/array. " | |
| "Do not include markdown code fences or extra explanation." | |
| ), | |
| } | |
| else: | |
| retry_messages.insert( | |
| 0, | |
| { | |
| "role": "system", | |
| "content": ( | |
| "CRITICAL: Return ONLY a valid JSON object/array. " | |
| "Do not include markdown code fences or extra explanation." | |
| ), | |
| }, | |
| ) | |
| retry_request.messages = retry_messages | |
| logger.warning("Structured output parse failed; retrying once with strict JSON-only prompt.") | |
| try: | |
| return await _collect(retry_request) | |
| except Exception as exc: | |
| logger.warning("Structured retry failed; returning first-pass result. error=%s", exc) | |
| return result | |