Spaces:
Running
Running
| """ | |
| Agent registry built with Agno SDK (Agent + AgentOS). | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import json | |
| from types import SimpleNamespace | |
| from typing import Any | |
| from agno.agent import Agent | |
| from agno.skills import Skills, LocalSkills | |
| # from agno.db.postgres import PostgresDb | |
| # from agno.memory import MemoryManager | |
| from agno.models.google import Gemini | |
| from agno.models.openai import OpenAILike | |
| # from agno.session.summary import SessionSummaryManager | |
| from agno.utils.log import logger | |
| from ..config import get_settings | |
| from ..models.db import DbFilter, DbQueryRequest | |
| from .db_service import get_db_adapter | |
| from .custom_tools import ( | |
| DuckDuckGoImageTools, | |
| DuckDuckGoVideoTools, | |
| DuckDuckGoWebSearchTools, | |
| QurioLocalTools, | |
| SerpApiImageTools, | |
| ) | |
| from .tool_registry import AGNO_TOOLS, IMAGE_SEARCH_TOOLS, LOCAL_TOOLS, VIDEO_SEARCH_TOOLS, resolve_tool_name | |
| from .user_tools import build_user_tools_toolkit | |
| try: | |
| from agno.tools.exa import ExaTools | |
| except Exception: | |
| ExaTools = None | |
| EXA_SEARCH_TOOL_SET = {"search_exa"} | |
| EXA_ALLOWED_CATEGORIES = { | |
| "company", | |
| "research paper", | |
| "news", | |
| "pdf", | |
| "github", | |
| "tweet", | |
| "personal site", | |
| "linkedin profile", | |
| "financial report", | |
| } | |
| EXA_TIMEOUT_SECONDS = max( | |
| 15, | |
| int(os.getenv("EXA_TOOLS_TIMEOUT_SECONDS", os.getenv("EXA_MCP_TIMEOUT_SECONDS", "45"))), | |
| ) | |
| DEFAULT_MODELS: dict[str, str] = { | |
| "openai": os.getenv("OPENAI_MODEL", "gpt-4o-mini"), | |
| "openai_compatibility": os.getenv("OPENAI_COMPAT_MODEL", "gpt-4o-mini"), | |
| "siliconflow": os.getenv("SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct"), | |
| "glm": os.getenv("GLM_MODEL", "glm-4-flash"), | |
| "deepseek": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"), | |
| "volcengine": os.getenv("VOLCENGINE_MODEL", "doubao-seed-1-6-thinking-250615"), | |
| "modelscope": os.getenv("MODELSCOPE_MODEL", "AI-ModelScope/glm-4-9b-chat"), | |
| "kimi": os.getenv("KIMI_MODEL", "moonshot-v1-8k"), | |
| "gemini": os.getenv("GEMINI_MODEL", "gemini-2.0-flash-exp"), | |
| "nvidia": os.getenv("NVIDIA_MODEL", "deepseek-ai/deepseek-r1"), | |
| "minimax": os.getenv("MINIMAX_MODEL", "minimax-m2"), | |
| } | |
| DEFAULT_BASE_URLS: dict[str, str] = { | |
| "openai": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), | |
| "openai_compatibility": os.getenv("OPENAI_COMPAT_BASE_URL", "https://api.openai.com/v1"), | |
| "siliconflow": os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1"), | |
| "glm": os.getenv("GLM_BASE_URL", "https://open.bigmodel.cn/api/paas/v4"), | |
| "deepseek": os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"), | |
| "volcengine": os.getenv("VOLCENGINE_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3"), | |
| "modelscope": os.getenv("MODELSCOPE_BASE_URL", "https://api-inference.modelscope.cn/v1"), | |
| "kimi": os.getenv("KIMI_BASE_URL", "https://api.moonshot.cn/v1"), | |
| "nvidia": os.getenv("NVIDIA_BASE_URL", "https://integrate.api.nvidia.com/v1"), | |
| "minimax": os.getenv("MINIMAX_BASE_URL", "https://api.minimax.io/v1"), | |
| } | |
| # These will be initialized within functions using get_settings() to ensure .env is loaded | |
| # MEMORY_LITE_PROVIDER = ... | |
| # MEMORY_LITE_MODEL = ... | |
| # MEMORY_LITE_BASE_URL = ... | |
| # MEMORY_AGENT_API_KEY = ... | |
| # Global database instance to avoid multiple table definitions in SQLAlchemy | |
| # Global database instance to avoid multiple table definitions in SQLAlchemy | |
| # _agent_db was removed as we use DbAdapter pattern. | |
| def _build_model(provider: str, api_key: str | None, base_url: str | None, model: str | None): | |
| provider_key = provider or "openai" | |
| model_id = model or DEFAULT_MODELS.get(provider_key) or DEFAULT_MODELS["openai"] | |
| resolved_base = base_url or DEFAULT_BASE_URLS.get(provider_key) or DEFAULT_BASE_URLS["openai"] | |
| if provider_key == "gemini": | |
| return Gemini(id=model_id, api_key=api_key) | |
| return OpenAILike(id=model_id, api_key=api_key, base_url=resolved_base) | |
| def _merge_model_dict_attr(model: Any, attr: str, payload: dict[str, Any]) -> None: | |
| if not payload: | |
| return | |
| current = getattr(model, attr, None) | |
| if current is None: | |
| setattr(model, attr, dict(payload)) | |
| elif isinstance(current, dict): | |
| merged = {**current, **payload} | |
| setattr(model, attr, merged) | |
| def _apply_common_params(model: Any, request: Any) -> None: | |
| if request.temperature is not None and hasattr(model, "temperature"): | |
| model.temperature = request.temperature | |
| if request.top_p is not None and hasattr(model, "top_p"): | |
| model.top_p = request.top_p | |
| if request.frequency_penalty is not None and hasattr(model, "frequency_penalty"): | |
| model.frequency_penalty = request.frequency_penalty | |
| if request.presence_penalty is not None and hasattr(model, "presence_penalty"): | |
| model.presence_penalty = request.presence_penalty | |
| if request.top_k is not None: | |
| if hasattr(model, "top_k"): | |
| model.top_k = request.top_k | |
| else: | |
| _merge_model_dict_attr(model, "extra_body", {"top_k": request.top_k}) | |
| def _apply_thinking_params(model: Any, provider: str, thinking: dict[str, Any] | bool | None) -> None: | |
| if not thinking: | |
| return | |
| if provider == "gemini": | |
| if isinstance(thinking, dict): | |
| config = thinking.get("thinkingConfig") or thinking.get("thinking_config") or {} | |
| include = config.get("includeThoughts") or config.get("include_thoughts") | |
| budget = config.get("thinkingBudget") or config.get("thinking_budget") | |
| level = config.get("thinkingLevel") or config.get("thinking_level") | |
| if include is not None and hasattr(model, "include_thoughts"): | |
| model.include_thoughts = include | |
| if budget is not None and hasattr(model, "thinking_budget"): | |
| model.thinking_budget = budget | |
| if level is not None and hasattr(model, "thinking_level"): | |
| model.thinking_level = level | |
| elif hasattr(model, "include_thoughts"): | |
| model.include_thoughts = True | |
| return | |
| if provider in {"siliconflow", "modelscope"}: | |
| budget = None | |
| if isinstance(thinking, dict): | |
| budget = thinking.get("budget_tokens") or thinking.get("budgetTokens") | |
| if budget is None: | |
| budget = 1024 | |
| model_id_lower = str(getattr(model, "id", "") or "").lower() | |
| is_siliconflow_kimi_thinking = ( | |
| provider == "siliconflow" | |
| and "kimi" in model_id_lower | |
| and "thinking" in model_id_lower | |
| ) | |
| # SiliconFlow Kimi-thinking models may reject `enable_thinking`. | |
| if is_siliconflow_kimi_thinking: | |
| _merge_model_dict_attr(model, "extra_body", {"thinking_budget": budget}) | |
| current_extra = getattr(model, "extra_body", None) | |
| if isinstance(current_extra, dict) and "enable_thinking" in current_extra: | |
| merged = dict(current_extra) | |
| merged.pop("enable_thinking", None) | |
| setattr(model, "extra_body", merged) | |
| else: | |
| _merge_model_dict_attr( | |
| model, | |
| "extra_body", | |
| {"enable_thinking": True, "thinking_budget": budget}, | |
| ) | |
| # _merge_model_dict_attr( | |
| # model, | |
| # "request_params", | |
| # {"enable_thinking": True, "thinking_budget": budget}, | |
| # ) | |
| return | |
| if provider == "nvidia": | |
| _merge_model_dict_attr(model, "extra_body", {"chat_template_kwargs": {"thinking": True}}) | |
| return | |
| if provider == "minimax": | |
| if isinstance(thinking, dict) and isinstance(thinking.get("extra_body"), dict): | |
| _merge_model_dict_attr(model, "extra_body", thinking.get("extra_body")) | |
| else: | |
| _merge_model_dict_attr(model, "extra_body", {"reasoning_split": True}) | |
| return | |
| if provider in {"glm", "deepseek", "volcengine"}: | |
| if isinstance(thinking, dict) and thinking.get("type"): | |
| payload = {"thinking": {"type": thinking.get("type")}} | |
| _merge_model_dict_attr(model, "extra_body", payload) | |
| # _merge_model_dict_attr(model, "request_params", payload) | |
| return | |
| if provider == "kimi": | |
| if isinstance(thinking, dict): | |
| max_tokens = thinking.get("max_tokens") | |
| temperature = thinking.get("temperature") | |
| if max_tokens is not None and hasattr(model, "max_tokens"): | |
| model.max_tokens = max_tokens | |
| if temperature is not None and hasattr(model, "temperature"): | |
| model.temperature = temperature | |
| return | |
| if provider == "openai_compatibility": | |
| if isinstance(thinking, dict): | |
| extra_body = thinking.get("extra_body") | |
| if isinstance(extra_body, dict): | |
| _merge_model_dict_attr(model, "extra_body", extra_body) | |
| return | |
| def _apply_model_settings(model: Any, request: Any) -> None: | |
| _apply_common_params(model, request) | |
| _apply_thinking_params(model, request.provider, request.thinking) | |
| def _collect_enabled_tool_names(request: Any) -> list[str]: | |
| names: list[str] = [] | |
| if request.provider != "gemini": | |
| for tool_id in request.tool_ids or []: | |
| names.append(resolve_tool_name(str(tool_id))) | |
| for tool_def in request.tools or []: | |
| if hasattr(tool_def, "model_dump"): | |
| tool_def = tool_def.model_dump() | |
| name = tool_def.get("function", {}).get("name") if isinstance(tool_def, dict) else None | |
| if name: | |
| names.append(resolve_tool_name(name)) | |
| for user_tool in request.user_tools or []: | |
| if getattr(user_tool, "name", None): | |
| names.append(str(user_tool.name)) | |
| elif isinstance(user_tool, dict) and user_tool.get("name"): | |
| names.append(str(user_tool["name"])) | |
| # Disable interactive forms in expert mode (team mode) | |
| is_expert = getattr(request, "expert_mode", False) or bool(getattr(request, "team_agent_ids", [])) | |
| if is_expert: | |
| names = [n for n in names if n != "interactive_form"] | |
| return names | |
| def _has_selected_skills(request: Any) -> bool: | |
| """Check if any manual/external skills are selected in the request.""" | |
| raw_skill_ids = getattr(request, "skill_ids", None) | |
| if isinstance(raw_skill_ids, list): | |
| return any(str(item or "").strip() for item in raw_skill_ids) | |
| if isinstance(raw_skill_ids, str): | |
| return bool(raw_skill_ids.strip()) | |
| return False | |
| def _has_skills(request: Any) -> bool: | |
| """Check if the agent will have any active skills (internal or external).""" | |
| if not getattr(request, "enable_skills", False): | |
| return False | |
| # 1. Check for manual/external skills | |
| if _has_selected_skills(request): | |
| return True | |
| # 2. Check for internal skills. | |
| # Note: agent-memory and skill-creator are handled specifically, but other | |
| # internal skills are loaded by default if enable_skills is True. | |
| if getattr(request, "enable_long_term_memory", False): | |
| return True | |
| internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills') | |
| if os.path.isdir(internal_skills_dir): | |
| for item in os.listdir(internal_skills_dir): | |
| if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"): | |
| continue | |
| if os.path.isdir(os.path.join(internal_skills_dir, item)): | |
| return True | |
| return False | |
| def _build_tools(request: Any) -> list[Any]: | |
| enabled_names = set(_collect_enabled_tool_names(request)) | |
| if ( | |
| not enabled_names | |
| and not request.user_tools | |
| and not getattr(request, "enable_skills", False) | |
| ): | |
| return [] | |
| serpapi_api_key = getattr(request, "serpapi_api_key", None) | |
| local_tool_names = {tool["name"] for tool in LOCAL_TOOLS} | |
| include_local = sorted([name for name in enabled_names if name in local_tool_names]) | |
| # Inject skill execution tools if ANY skill (internal or external) is present | |
| if _has_skills(request): | |
| include_local = sorted( | |
| set(include_local) | {"execute_skill_script", "install_skill_dependency"} | |
| ) | |
| tools: list[Any] = [] | |
| if include_local: | |
| tools.append( | |
| QurioLocalTools( | |
| tavily_api_key=request.tavily_api_key, | |
| include_tools=include_local, | |
| ) | |
| ) | |
| agno_tool_names = {tool["name"] for tool in AGNO_TOOLS} | |
| include_agno = sorted([name for name in enabled_names if name in agno_tool_names]) | |
| # Always include zero-config image/video search tools by default if not explicitly disabled | |
| # SerpApi-based tools are only included if API key is configured | |
| if not getattr(request, "skip_default_tools", False): | |
| # Zero-config tools (DuckDuckGo) - always include | |
| default_image_tools = {"duckduckgo_image_search"} | |
| default_video_tools = {"duckduckgo_video_search"} | |
| # SerpApi-based tools - only include if API key is available | |
| serpapi_image_tools = {"google_image_search", "serpapi_image_search", "bing_image_search"} | |
| serpapi_video_tools = {"search_youtube"} | |
| default_tools = default_image_tools | default_video_tools | |
| if serpapi_api_key: | |
| default_tools = default_tools | serpapi_image_tools | serpapi_video_tools | |
| include_agno = sorted(list(set(include_agno) | default_tools)) | |
| if include_agno: | |
| tools.extend(_build_agno_toolkits(request, include_agno)) | |
| user_toolkit = build_user_tools_toolkit( | |
| [tool.model_dump() if hasattr(tool, "model_dump") else tool for tool in request.user_tools or []] | |
| ) | |
| if user_toolkit: | |
| tools.append(user_toolkit) | |
| mcp_url = os.getenv("MCP_SERVER_URL") | |
| if mcp_url: | |
| try: | |
| from agno.tools.mcp import MCPTools | |
| except Exception: | |
| MCPTools = None | |
| if MCPTools: | |
| tools.append(MCPTools(url=mcp_url, transport=os.getenv("MCP_TRANSPORT", "streamable-http"))) | |
| return tools | |
| def _build_agno_toolkits(request: Any, include_agno: list[str]) -> list[Any]: | |
| toolkits: list[Any] = [] | |
| include_set = set(include_agno) | |
| serpapi_api_key = getattr(request, "serpapi_api_key", None) | |
| tavily_tools = {"web_search_using_tavily", "web_search_with_tavily", "extract_url_content"} | |
| if include_set.intersection(tavily_tools): | |
| try: | |
| from agno.tools.tavily import TavilyTools | |
| except Exception: | |
| TavilyTools = None | |
| if TavilyTools: | |
| selected = [name for name in include_agno if name in tavily_tools] | |
| toolkits.append(TavilyTools(api_key=request.tavily_api_key, include_tools=selected)) | |
| websearch_tools = {"web_search", "search_news"} | |
| if include_set.intersection(websearch_tools): | |
| backend = getattr(request, "search_backend", None) or "auto" | |
| if backend == "exa": | |
| exa_toolkit = _build_exa_toolkit( | |
| getattr(request, "exa_api_key", None), | |
| getattr(request, "exa_search_category", None), | |
| ) | |
| if exa_toolkit: | |
| toolkits.append(exa_toolkit) | |
| else: | |
| logger.warning("Exa backend requested but ExaTools is unavailable or API key is missing.") | |
| else: | |
| selected = [name for name in include_agno if name in websearch_tools] | |
| toolkits.append( | |
| DuckDuckGoWebSearchTools( | |
| include_tools=selected, | |
| backend=backend, | |
| ) | |
| ) | |
| if include_set.intersection(EXA_SEARCH_TOOL_SET) and not ( | |
| (getattr(request, "search_backend", None) or "auto") == "exa" | |
| and include_set.intersection(websearch_tools) | |
| ): | |
| exa_toolkit = _build_exa_toolkit( | |
| getattr(request, "exa_api_key", None), | |
| getattr(request, "exa_search_category", None), | |
| ) | |
| if exa_toolkit: | |
| toolkits.append(exa_toolkit) | |
| else: | |
| logger.warning("Exa tool requested but ExaTools is unavailable or API key is missing.") | |
| arxiv_tools = {"search_arxiv_and_return_articles", "read_arxiv_papers"} | |
| if include_set.intersection(arxiv_tools): | |
| try: | |
| from agno.tools.arxiv import ArxivTools | |
| except Exception: | |
| ArxivTools = None | |
| if ArxivTools: | |
| selected = [name for name in include_agno if name in arxiv_tools] | |
| toolkits.append(ArxivTools(include_tools=selected)) | |
| wikipedia_tools = {"search_wikipedia"} | |
| if include_set.intersection(wikipedia_tools): | |
| try: | |
| from agno.tools.wikipedia import WikipediaTools | |
| except Exception: | |
| WikipediaTools = None | |
| if WikipediaTools: | |
| toolkits.append(WikipediaTools(include_tools=["search_wikipedia"])) | |
| yfinance_tools = { | |
| "yfinance_tools", | |
| "get_current_stock_price", | |
| "get_company_info", | |
| "get_stock_fundamentals", | |
| "get_income_statements", | |
| "get_key_financial_ratios", | |
| "get_analyst_recommendations", | |
| "get_company_news", | |
| "get_technical_indicators", | |
| "get_historical_stock_prices", | |
| } | |
| if include_set.intersection(yfinance_tools): | |
| try: | |
| from agno.tools.yfinance import YFinanceTools | |
| except Exception: | |
| YFinanceTools = None | |
| if YFinanceTools: | |
| # Keep YFinance toolkit initialization aligned with Agno's default usage. | |
| # Some SDK versions expose different subsets/names, which can make | |
| # include_tools fail with "tool not present in toolkit". | |
| toolkits.append(YFinanceTools()) | |
| image_search_tools = { | |
| "duckduckgo_image_search", | |
| "google_image_search", | |
| "serpapi_image_search", | |
| "bing_image_search", | |
| } | |
| if include_set.intersection(image_search_tools): | |
| # DuckDuckGo Image Search (Custom) - always available, no config needed | |
| if "duckduckgo_image_search" in include_set: | |
| toolkits.append(DuckDuckGoImageTools(include_tools=["duckduckgo_image_search"])) | |
| # SerpApi Image Search (Custom) - only add if API key is configured | |
| serpapi_tools = { | |
| "google_image_search", | |
| "serpapi_image_search", | |
| "bing_image_search", | |
| } | |
| serpapi_include = sorted([name for name in include_set if name in serpapi_tools]) | |
| # Only add SerpApi tools if API key is available | |
| if serpapi_include and serpapi_api_key: | |
| toolkits.append( | |
| SerpApiImageTools( | |
| api_key=serpapi_api_key, include_tools=serpapi_include | |
| ) | |
| ) | |
| video_search_tools = { | |
| "duckduckgo_video_search", | |
| "search_youtube", | |
| } | |
| if include_set.intersection(video_search_tools): | |
| # DuckDuckGo Video Search (Custom) - always available, no config needed | |
| if "duckduckgo_video_search" in include_set: | |
| toolkits.append(DuckDuckGoVideoTools(include_tools=["duckduckgo_video_search"])) | |
| # YouTube Search via SerpApi - only add if API key is configured | |
| if "search_youtube" in include_set and serpapi_api_key: | |
| try: | |
| from agno.tools.serpapi import SerpApiTools as AgnoSerpApiTools | |
| except Exception: | |
| AgnoSerpApiTools = None | |
| if AgnoSerpApiTools: | |
| toolkits.append( | |
| AgnoSerpApiTools( | |
| api_key=serpapi_api_key, | |
| enable_search_google=False, | |
| enable_search_youtube=True, | |
| ) | |
| ) | |
| return toolkits | |
| def _normalize_exa_category(category: str | None) -> str | None: | |
| normalized = str(category or "").strip().lower() | |
| if not normalized or normalized == "auto": | |
| return None | |
| return normalized if normalized in EXA_ALLOWED_CATEGORIES else None | |
| def _build_exa_toolkit(exa_api_key: str | None, category: str | None = None) -> Any | None: | |
| if ExaTools is None: | |
| return None | |
| trimmed_key = str(exa_api_key or os.getenv("EXA_API_KEY") or "").strip() | |
| if not trimmed_key: | |
| return None | |
| return ExaTools( | |
| api_key=trimmed_key, | |
| enable_search=True, | |
| enable_get_contents=False, | |
| enable_find_similar=False, | |
| enable_answer=False, | |
| timeout=EXA_TIMEOUT_SECONDS, | |
| category=_normalize_exa_category(category), | |
| ) | |
| def get_summary_model(request: Any) -> Any | None: | |
| """ | |
| Get the lite model for session summary generation from environment variables. | |
| This is a simplified implementation that uses global configuration. | |
| Future enhancement: Support per-agent lite_model from database. | |
| Returns: | |
| Agno model instance for summary generation, or None if unavailable | |
| """ | |
| settings = get_settings() | |
| try: | |
| # Priority: Request params > Global Settings | |
| # Priority: Request params (summary_*) > Global Settings (summary_*) | |
| lite_provider = getattr(request, "summary_provider", None) or settings.summary_lite_provider | |
| lite_model = getattr(request, "summary_model", None) or settings.summary_lite_model | |
| lite_api_key = getattr(request, "summary_api_key", None) or settings.summary_agent_api_key | |
| lite_base_url = getattr(request, "summary_base_url", None) or settings.summary_lite_base_url | |
| if not lite_model or not lite_api_key: | |
| logger.warning("Lite Model not configured (checked request summary_* params and SUMMARY_LITE_MODEL env var)") | |
| return None | |
| source = "Request-Specific" if getattr(request, "summary_model", None) else "Global-Default" | |
| logger.info(f"[{source}] Selected Lite Model for Session Summary: {lite_provider}/{lite_model}") | |
| # If no base_url provided, use the default for the provider | |
| resolved_base = lite_base_url or DEFAULT_BASE_URLS.get(lite_provider) or DEFAULT_BASE_URLS["openai"] | |
| summary_model = _build_model(lite_provider, lite_api_key, resolved_base, lite_model) | |
| # Disable native structured outputs for summary model to ensure robust parsing with non-OpenAI providers (like GLM) | |
| # This only affects this specific summary_model instance. | |
| if hasattr(summary_model, "supports_native_structured_outputs"): | |
| summary_model.supports_native_structured_outputs = False | |
| return summary_model | |
| except Exception as exc: | |
| logger.warning(f"Failed to build lite_model for session summary: {exc}") | |
| return None | |
| def build_agent(request: Any = None, **kwargs: Any) -> Agent: | |
| # Backward-compatible shim for legacy build_agent(provider=..., api_key=...) calls. | |
| if request is None or kwargs: | |
| provider = request if isinstance(request, str) else kwargs.get("provider") | |
| request = SimpleNamespace( | |
| provider=provider or "openai", | |
| api_key=kwargs.get("api_key"), | |
| base_url=kwargs.get("base_url"), | |
| model=kwargs.get("model"), | |
| tavily_api_key=kwargs.get("tavily_api_key"), | |
| temperature=kwargs.get("temperature"), | |
| top_p=kwargs.get("top_p"), | |
| top_k=kwargs.get("top_k"), | |
| frequency_penalty=kwargs.get("frequency_penalty"), | |
| presence_penalty=kwargs.get("presence_penalty"), | |
| thinking=kwargs.get("thinking"), | |
| tool_ids=kwargs.get("tool_ids"), | |
| tools=kwargs.get("tools"), | |
| user_tools=kwargs.get("user_tools"), | |
| tool_choice=kwargs.get("tool_choice"), | |
| ) | |
| model = _build_model(request.provider, request.api_key, request.base_url, request.model) | |
| _apply_model_settings(model, request) | |
| tools = _build_tools(request) | |
| # memory_kwargs = _build_memory_kwargs(request) | |
| tool_choice = request.tool_choice | |
| if tool_choice is None and tools: | |
| tool_choice = "auto" | |
| # 1. Conditional instructions: Multi-form guidance & Image/Video rendering | |
| enabled_names = set(_collect_enabled_tool_names(request)) | |
| # Add image and video search tools to enabled_names set since they are forced | |
| image_search_names = {tool["name"] for tool in IMAGE_SEARCH_TOOLS} | |
| video_search_names = {tool["name"] for tool in VIDEO_SEARCH_TOOLS} | |
| enabled_names.update(image_search_names) | |
| enabled_names.update(video_search_names) | |
| instructions_list = [] | |
| if "interactive_form" in enabled_names: | |
| instructions_list.append( | |
| "When using the interactive_form tool to collect user information: " | |
| "If the user's initial responses lack critical details needed to fulfill their request, " | |
| "you MUST call interactive_form again to gather the missing specific information. " | |
| "Do not proceed with incomplete information. " | |
| "However, limit to 2-3 forms maximum per conversation to respect user time." | |
| ) | |
| if getattr(request, "enable_skills", False) and _has_selected_skills(request): | |
| instructions_list.append( | |
| "When a skill tells you to run a bundled script, do not merely summarize or restate the script. " | |
| "Use execute_skill_script to actually run the file and rely on its stdout/stderr." | |
| ) | |
| instructions_list.append( | |
| "When a skill script fails because of a missing Python dependency " | |
| "(for example ModuleNotFoundError or ImportError), do not pretend it succeeded. " | |
| "First explain the missing package briefly, then use interactive_form to ask for explicit approval " | |
| "before installing anything. Do NOT ask for approval in plain text if interactive_form is available. " | |
| "For dependency approval, keep the form minimal: include the package name in the form title or description, " | |
| "and ask only for an approval choice when skill_id and package_name are already known from context. " | |
| "Do NOT ask the user to retype known values like skill_id or package_name. " | |
| "Only after the user approves may you call install_skill_dependency. " | |
| "If the user declines, stop and report that installation was skipped." | |
| ) | |
| if "duckduckgo_image_search" in enabled_names or "google_image_search" in enabled_names: | |
| instructions_list.append( | |
| "When explaining concepts that can benefit from visual aids (like Logo, diagrams, or photos), " | |
| "you should use the image search tools to find relevant images. " | |
| "ALWAYS render images in your response using markdown format: . " | |
| "Place images appropriately within your explanation to enhance user understanding." | |
| ) | |
| if "duckduckgo_video_search" in enabled_names or "search_youtube" in enabled_names: | |
| instructions_list.append( | |
| "When users ask about tutorials, demonstrations, or topics that benefit from video content, " | |
| "you should use the video search tools to find relevant videos. " | |
| "ALWAYS include video links in your response using markdown format with descriptive text. " | |
| "Provide context about why each video is relevant to the user's query." | |
| ) | |
| if getattr(request, "search_backend", None) == "exa" and ( | |
| "web_search" in enabled_names or bool(enabled_names.intersection(EXA_SEARCH_TOOL_SET)) | |
| ): | |
| exa_category = _normalize_exa_category(getattr(request, "exa_search_category", None)) | |
| if exa_category: | |
| instructions_list.append( | |
| f"Exa search is available via search_exa and is constrained to the '{exa_category}' category." | |
| ) | |
| else: | |
| instructions_list.append( | |
| "Exa search is available via search_exa. Use it for current web information, and choose a category when it helps." | |
| ) | |
| instructions = "\n\n".join(instructions_list) if instructions_list else None | |
| # 2. Agent Construction (Stateless / Manual Context) | |
| # We do NOT inject 'db' or 'memory' here. | |
| # Session context (history + summary) is injected manually in stream_chat.py | |
| skills = None | |
| if getattr(request, "enable_skills", False): | |
| skills_dir = os.path.join(os.path.dirname(__file__), '..', '..', '.skills') | |
| internal_skills_dir = os.path.join(os.path.dirname(__file__), '..', '_internal_skills') | |
| requested_skills = getattr(request, "skill_ids", []) | |
| if isinstance(requested_skills, str): | |
| try: | |
| requested_skills = json.loads(requested_skills) | |
| except (json.JSONDecodeError, TypeError): | |
| requested_skills = [] | |
| paths = [] | |
| # Inject built-in agent-memory skill if long term memory is enabled | |
| if getattr(request, "enable_long_term_memory", False): | |
| am_path = os.path.join(internal_skills_dir, "agent-memory") | |
| if os.path.isdir(am_path): | |
| paths.append(am_path) | |
| # Inject any other internal skills by default (except for specific non-autoloading skills) | |
| if os.path.isdir(internal_skills_dir): | |
| for item in os.listdir(internal_skills_dir): | |
| if item in ("agent-memory", "skill-creator", "academic-research", "deep-research"): | |
| continue | |
| item_path = os.path.join(internal_skills_dir, item) | |
| if os.path.isdir(item_path): | |
| paths.append(item_path) | |
| if requested_skills: | |
| for skill_id in requested_skills: | |
| internal_skill_path = os.path.join(internal_skills_dir, skill_id) | |
| external_skill_path = os.path.join(skills_dir, skill_id) | |
| if os.path.isdir(internal_skill_path): | |
| paths.append(internal_skill_path) | |
| elif os.path.isdir(external_skill_path): | |
| paths.append(external_skill_path) | |
| if paths: | |
| skills = Skills(loaders=[LocalSkills(path) for path in paths]) | |
| # Merge personalized prompt with tool-derived instructions | |
| personalized = getattr(request, "personalized_prompt", None) | |
| if personalized: | |
| if instructions: | |
| instructions = f"{personalized}\n\n{instructions}" | |
| else: | |
| instructions = personalized | |
| # Use resolved agent name/description if available (for Team member identification) | |
| agent_id = getattr(request, "agent_id", None) or f"qurio-{request.provider}" | |
| agent_name = getattr(request, "agent_name", None) or f"Qurio {request.provider} Agent" | |
| agent_description = getattr(request, "agent_description", None) | |
| return Agent( | |
| id=agent_id, | |
| name=agent_name, | |
| description=agent_description, | |
| role=agent_description, | |
| model=model, | |
| tools=tools or None, | |
| markdown=True, | |
| tool_choice=tool_choice, | |
| instructions=instructions, | |
| skills=skills, | |
| ) | |
| # Mapping of provider to user_settings key for API keys | |
| _PROVIDER_KEY_MAP: dict[str, str] = { | |
| "gemini": "googleApiKey", | |
| "openai": "OpenAICompatibilityKey", | |
| "openai_compatibility": "OpenAICompatibilityKey", | |
| "siliconflow": "SiliconFlowKey", | |
| "glm": "GlmKey", | |
| "deepseek": "DeepSeekKey", | |
| "volcengine": "VolcengineKey", | |
| "modelscope": "ModelScopeKey", | |
| "kimi": "KimiKey", | |
| "nvidia": "NvidiaKey", | |
| "minimax": "MinimaxKey", | |
| } | |
| def _get_provider_credentials(provider: str) -> tuple[str | None, str | None]: | |
| """ | |
| Get API key and base URL for a provider. | |
| Priority: user_settings table -> environment variables. | |
| """ | |
| api_key: str | None = None | |
| base_url = DEFAULT_BASE_URLS.get(provider) | |
| # 1. Try to get API key from user_settings table | |
| db_key = _PROVIDER_KEY_MAP.get(provider) | |
| if db_key: | |
| try: | |
| adapter = get_db_adapter() | |
| if adapter: | |
| req = DbQueryRequest( | |
| action="select", | |
| table="user_settings", | |
| filters=[DbFilter(op="eq", column="key", value=db_key)], | |
| maybe_single=True, | |
| ) | |
| result = adapter.execute(req) | |
| if result.data: | |
| data = result.data | |
| if isinstance(data, list) and len(data) > 0: | |
| data = data[0] | |
| if isinstance(data, dict): | |
| api_key = data.get("value") | |
| except Exception as e: | |
| logger.debug(f"Failed to get API key from user_settings for {provider}: {e}") | |
| # 2. Fallback to environment variables | |
| if not api_key: | |
| provider_upper = provider.upper().replace("-", "_") | |
| api_key = os.getenv(f"{provider_upper}_API_KEY") | |
| return api_key, base_url | |
| def resolve_agent_config(agent_id: str, base_request: Any) -> Any: | |
| """Fetch agent configuration from database and merge with base request secrets.""" | |
| import copy | |
| from types import SimpleNamespace | |
| adapter = get_db_adapter() | |
| if not adapter: | |
| return base_request | |
| try: | |
| query_req = DbQueryRequest( | |
| action="select", | |
| table="agents", | |
| filters=[DbFilter(op="eq", column="id", value=agent_id)], | |
| maybe_single=True | |
| ) | |
| response = adapter.execute(query_req) | |
| if response.error: | |
| logger.warning(f"[resolve_agent_config] DB error for agent_id={agent_id}: {response.error}") | |
| return base_request | |
| if not response.data: | |
| logger.warning(f"[resolve_agent_config] No agent found with id={agent_id}") | |
| return base_request | |
| # Handle case where data might be a list or a single dict | |
| agent_data = response.data | |
| if isinstance(agent_data, list): | |
| if len(agent_data) == 0: | |
| return base_request | |
| agent_data = agent_data[0] | |
| new_req = copy.deepcopy(base_request) | |
| # Override provider and get corresponding credentials from settings | |
| agent_provider = agent_data.get("provider") | |
| if agent_provider: | |
| new_req.provider = agent_provider | |
| # Get API key and base URL for this provider from environment | |
| api_key, base_url = _get_provider_credentials(agent_provider) | |
| if api_key: | |
| new_req.api_key = api_key | |
| if base_url: | |
| new_req.base_url = base_url | |
| new_req.model = agent_data.get("default_model") or base_request.model | |
| new_req.tool_ids = agent_data.get("tool_ids") or [] | |
| new_req.skill_ids = agent_data.get("skill_ids") or [] | |
| # Handle instructions (personalized prompt) | |
| # We'll store it in a custom attribute that build_agent can pick up | |
| new_req.personalized_prompt = agent_data.get("prompt") | |
| # Store agent name and description for proper identification in Teams | |
| new_req.agent_id = agent_id # Store the original agent_id | |
| new_req.agent_name = agent_data.get("name") | |
| new_req.agent_emoji = agent_data.get("emoji") | |
| new_req.agent_description = agent_data.get("description") | |
| logger.info(f"[resolve_agent_config] Resolved agent: id={agent_id}, name={new_req.agent_name}, provider={agent_provider}") | |
| # Override generation params if configured | |
| if not agent_data.get("use_global_model_settings"): | |
| if agent_data.get("temperature") is not None: | |
| new_req.temperature = agent_data.get("temperature") | |
| if agent_data.get("top_p") is not None: | |
| new_req.top_p = agent_data.get("top_p") | |
| if agent_data.get("frequency_penalty") is not None: | |
| new_req.frequency_penalty = agent_data.get("frequency_penalty") | |
| if agent_data.get("presence_penalty") is not None: | |
| new_req.presence_penalty = agent_data.get("presence_penalty") | |
| return new_req | |
| except Exception as e: | |
| logger.error(f"Failed to resolve agent config for {agent_id}: {e}") | |
| return base_request | |
| def build_memory_agent( | |
| user_id: str | None = None, | |
| provider: str | None = None, | |
| model: str | None = None, | |
| base_url: str | None = None, | |
| api_key: str | None = None, | |
| ) -> Agent: | |
| settings = get_settings() | |
| resolved_provider = provider or settings.memory_lite_provider | |
| resolved_model = model or settings.memory_lite_model | |
| resolved_api_key = api_key or settings.memory_agent_api_key or os.getenv("OPENAI_API_KEY") | |
| resolved_base_url = ( | |
| base_url | |
| or DEFAULT_BASE_URLS.get(resolved_provider) | |
| or DEFAULT_BASE_URLS["openai"] | |
| ) | |
| memory_request = SimpleNamespace( | |
| provider=resolved_provider, | |
| api_key=resolved_api_key, | |
| base_url=resolved_base_url, | |
| model=resolved_model, | |
| tavily_api_key=os.getenv("TAVILY_API_KEY"), | |
| temperature=None, | |
| top_p=None, | |
| top_k=None, | |
| frequency_penalty=None, | |
| presence_penalty=None, | |
| thinking=None, | |
| tool_ids=[], | |
| tools=None, | |
| user_tools=None, | |
| tool_choice=None, | |
| enable_long_term_memory=True, | |
| database_provider=os.getenv("DATABASE_PROVIDER") or "default", | |
| user_id=user_id, | |
| enable_skills=False, # Helper agent: keep skills disabled | |
| ) | |
| return build_agent(memory_request) | |
| def get_agent_for_provider(request: Any) -> Agent: | |
| return build_agent(request) | |
| def build_team(request: Any, members: list[Agent]) -> Any: | |
| """Build an Agno Team from a list of member agents.""" | |
| from agno.team import Team | |
| from agno.team.mode import TeamMode | |
| # Map requested team mode string to TeamMode enum | |
| mode_str = getattr(request, "team_mode", "route") | |
| try: | |
| team_mode = TeamMode(mode_str) | |
| except Exception: | |
| team_mode = TeamMode.route | |
| # Build a leader agent to inherit tools, skills, and personalised prompt | |
| leader_agent = build_agent(request) | |
| instructions = ( | |
| "You are the Team Leader coordinating a group of expert agents. " | |
| "Analyze the user's request. Based on the expertise of your team members, " | |
| "delegate sub-tasks or questions to them. Finally, synthesize and summarize their findings into a comprehensive response." | |
| ) | |
| # Prepend leader's own instructions if they exist | |
| if leader_agent.instructions: | |
| instructions = f"{leader_agent.instructions}\n\n{instructions}" | |
| team = Team( | |
| name=getattr(request, "agent_name", "Expert Team"), | |
| members=members, | |
| model=leader_agent.model, | |
| tools=leader_agent.tools, | |
| mode=team_mode, | |
| instructions=instructions, | |
| markdown=True, | |
| stream_member_events=True, # Ensure member events are streamed | |
| ) | |
| # Set agent_id manually as Team constructor might not support it directly | |
| team.agent_id = getattr(request, "agent_id", None) | |
| return team | |