veeiiinnnnn's picture
new
592cb1d
"""
Local tool execution helpers (legacy support for non-Agno adapters).
"""
from __future__ import annotations
import ast
import asyncio
import json
import os
import re
from datetime import datetime
from typing import Any
import httpx
from .academic_domains import ACADEMIC_DOMAINS
from .skill_runtime import (
execute_skill_script as execute_skill_script_runtime,
install_skill_dependency as install_skill_dependency_runtime,
)
from .tool_registry import (
AGENT_TOOLS as REGISTRY_AGENT_TOOLS,
)
from .tool_registry import (
ALL_TOOLS as REGISTRY_ALL_TOOLS,
)
from .tool_registry import (
GLOBAL_TOOLS as REGISTRY_GLOBAL_TOOLS,
)
from .tool_registry import (
LOCAL_TOOLS as REGISTRY_LOCAL_TOOLS,
)
from .tool_registry import (
get_tool_definitions_by_ids as list_tool_definitions_by_ids,
)
from .tool_registry import (
list_tools as list_tool_registry,
)
from .tool_registry import (
resolve_tool_name,
)
CUSTOM_TOOLS = REGISTRY_LOCAL_TOOLS
EXTERNAL_SEARCH_TOOL_NAMES = {
"web_search_using_tavily",
"web_search_with_tavily",
"extract_url_content",
"web_search",
"search_news",
"search_arxiv_and_return_articles",
"search_wikipedia",
}
FIXED_SEARCH_MAX_RESULTS = 5
def _tool_timeout_seconds(default: float = 20.0) -> float:
raw = os.getenv("QURIO_TOOL_TIMEOUT_SECONDS", str(default))
try:
value = float(raw)
if value <= 0:
return default
return value
except (TypeError, ValueError):
return default
def _parse_loose_object(value: Any) -> dict[str, Any] | None:
if isinstance(value, dict):
return value
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
for parser in (json.loads, ast.literal_eval):
try:
parsed = parser(text)
if isinstance(parsed, dict):
return parsed
except Exception:
continue
return None
def _coerce_agent_memory_args(script_path: str, raw_args: Any) -> list[str] | None:
normalized_script = str(script_path or "").strip().replace("\\", "/").lower()
if normalized_script not in {"memory_store.py", "scripts/memory_store.py"}:
return None
payload = _parse_loose_object(raw_args)
if not payload:
return None
command = str(
payload.get("command")
or payload.get("action")
or payload.get("operation")
or ""
).strip().lower()
if command in {"categories", "folders", "list-categories", "list-folders", "inspect"}:
return ["categories"]
if command in {"recall", "search", "find", "lookup"}:
keyword = str(
payload.get("keyword")
or payload.get("query")
or payload.get("text")
or payload.get("term")
or ""
).strip()
category = str(payload.get("category") or "").strip()
if not keyword:
return None
args = ["search", "--keyword", keyword]
if category:
args.extend(["--category", category])
return args
if command in {"list", "ls"}:
category = str(payload.get("category") or "").strip()
args = ["list"]
if category:
args.extend(["--category", category])
return args
if command in {"delete", "remove"}:
category = str(payload.get("category") or "").strip()
slug = str(payload.get("slug") or payload.get("name") or "").strip()
if not category or not slug:
return None
return ["delete", "--category", category, "--slug", slug]
if command in {"save", "remember", "store"}:
category = str(payload.get("category") or "").strip()
slug = str(payload.get("slug") or payload.get("name") or "").strip()
summary = str(payload.get("summary") or "").strip()
content = str(payload.get("content") or payload.get("text") or "").strip()
if not category or not slug or not summary or not content:
return None
args = [
"save",
"--category",
category,
"--slug",
slug,
"--summary",
summary,
"--content",
content,
]
title = str(payload.get("title") or "").strip()
status = str(payload.get("status") or "").strip()
tags = payload.get("tags")
related = payload.get("related")
overwrite = bool(payload.get("overwrite"))
if title:
args.extend(["--title", title])
if status:
args.extend(["--status", status])
if isinstance(tags, list) and tags:
args.extend(["--tags", ",".join(str(item).strip() for item in tags if str(item).strip())])
if isinstance(related, list) and related:
args.extend(["--related", ",".join(str(item).strip() for item in related if str(item).strip())])
if overwrite:
args.append("--overwrite")
return args
return None
def list_tools() -> list[dict[str, Any]]:
return list_tool_registry()
def get_tool_definitions_by_ids(tool_ids: list[str]) -> list[dict[str, Any]]:
return list_tool_definitions_by_ids(tool_ids)
async def execute_local_tool(
tool_name: str,
args: dict[str, Any],
tool_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
resolved_name = resolve_tool_name(tool_name)
match resolved_name:
case "Tavily_web_search":
return await _execute_tavily_web_search(args, tool_config)
case "web_search_using_tavily":
return await _execute_tavily_web_search(args, tool_config)
case "web_search_with_tavily":
return await _execute_tavily_web_search(args, tool_config, search_depth="advanced")
case "search_arxiv_and_return_articles":
return await _execute_tavily_academic_search(args, tool_config)
case "web_search":
return await _execute_tavily_web_search(args, tool_config)
case "search_news":
return await _execute_tavily_web_search(args, tool_config)
case "search_wikipedia":
return await _execute_tavily_web_search(args, tool_config)
case "local_time":
return await _execute_local_time(args)
case "summarize_text":
return await _execute_summarize_text(args)
case "extract_text":
return await _execute_extract_text(args)
case "json_repair":
return await _execute_json_repair(args)
case "interactive_form":
return await _execute_interactive_form(args)
case "install_skill_dependency":
return await _execute_install_skill_dependency(args)
case "execute_skill_script":
return await _execute_execute_skill_script(args)
case "webpage_reader":
return await _execute_webpage_reader(args)
case "Tavily_academic_search":
return await _execute_tavily_academic_search(args, tool_config)
case "extract_url_content":
return await _execute_url_extract(args)
case _:
raise ValueError(f"Unknown local tool: {resolved_name}")
async def _execute_local_time(args: dict[str, Any]) -> dict[str, Any]:
timezone = args.get("timezone") or "UTC"
locale = args.get("locale") or "en-US"
try:
now = datetime.now()
return {
"timezone": timezone,
"locale": locale,
"formatted": now.strftime("%Y-%m-%d %H:%M:%S"),
"iso": now.isoformat(),
}
except Exception as exc:
raise ValueError(f"Time error: {exc}")
def _split_sentences(text: str) -> list[str]:
sentences = re.split(r"[.!?\u3002\uff01\uff1f]+", text or "")
return [s.strip() for s in sentences if s.strip()]
async def _execute_summarize_text(args: dict[str, Any]) -> dict[str, Any]:
text = args.get("text", "")
max_sentences = args.get("max_sentences", 3)
max_chars = args.get("max_chars", 600)
sentences = _split_sentences(text)[:max_sentences]
summary = " ".join(sentences)
if len(summary) > max_chars:
summary = summary[:max_chars].strip()
return {"summary": summary}
async def _execute_extract_text(args: dict[str, Any]) -> dict[str, Any]:
text = args.get("text", "")
query = args.get("query", "").lower()
max_sentences = args.get("max_sentences", 5)
sentences = _split_sentences(text)
matches = [s for s in sentences if query in s.lower()] if query else sentences
return {"extracted": matches[:max_sentences]}
async def _execute_json_repair(args: dict[str, Any]) -> dict[str, Any]:
text = args.get("text", "")
try:
data = json.loads(text)
return {"valid": True, "repaired": text, "data": data}
except json.JSONDecodeError:
try:
repaired = text.strip()
repaired = re.sub(r",\s*}", "}", repaired)
repaired = re.sub(r",\s*]", "]", repaired)
data = json.loads(repaired)
return {"valid": False, "repaired": repaired, "data": data}
except Exception as exc:
return {"valid": False, "error": f"Unable to repair JSON: {exc}"}
async def _execute_interactive_form(args: dict[str, Any]) -> dict[str, Any]:
return {
"form_id": args.get("id"),
"title": args.get("title"),
"fields": args.get("fields", []),
"status": "pending_user_input",
}
async def _execute_install_skill_dependency(args: dict[str, Any]) -> dict[str, Any]:
skill_id = str(args.get("skill_id") or "").strip()
package_name = str(args.get("package_name") or "").strip()
if not skill_id or not package_name:
return {
"success": False,
"error": "skill_id and package_name are required",
"skill_id": skill_id or None,
"package_name": package_name or None,
}
try:
return await install_skill_dependency_runtime(skill_id, package_name)
except (FileNotFoundError, ValueError, RuntimeError) as exc:
return {
"success": False,
"error": str(exc),
"skill_id": skill_id,
"package_name": package_name,
}
async def _execute_execute_skill_script(args: dict[str, Any]) -> dict[str, Any]:
skill_id = str(args.get("skill_id") or "").strip()
script_path = str(args.get("script_path") or "").strip()
raw_args = args.get("args") or []
timeout_seconds = args.get("timeout_seconds") or 60.0
if not skill_id or not script_path:
return {
"success": False,
"error": "skill_id and script_path are required",
"skill_id": skill_id or None,
"script_path": script_path or None,
}
if skill_id == "agent-memory":
coerced_args = _coerce_agent_memory_args(script_path, raw_args)
if coerced_args:
raw_args = coerced_args
if not isinstance(raw_args, list):
raw_args = [str(raw_args)]
try:
return await execute_skill_script_runtime(
skill_id=skill_id,
script_path=script_path,
args=[str(item) for item in raw_args],
timeout_seconds=float(timeout_seconds),
)
except (FileNotFoundError, ValueError, RuntimeError) as exc:
return {
"success": False,
"error": str(exc),
"skill_id": skill_id,
"script_path": script_path,
}
async def _execute_webpage_reader(args: dict[str, Any]) -> dict[str, Any]:
url = args.get("url", "").strip()
normalized = re.sub(r"^https?://r\.jina\.ai/", "", url)
request_url = f"https://r.jina.ai/{normalized}"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(request_url, headers={"Accept": "text/plain"})
response.raise_for_status()
content = response.text
return {
"url": normalized,
"content": content,
"source": "jina.ai",
}
except httpx.HTTPError as exc:
return {"error": f"Webpage read failed: {str(exc)}"}
def _resolve_tavily_api_key(tool_config: dict[str, Any] | None) -> str:
if tool_config and tool_config.get("tavilyApiKey"):
return str(tool_config["tavilyApiKey"])
if tool_config and tool_config.get("searchProvider") == "tavily":
if tool_config.get("searchApiKey"):
return str(tool_config["searchApiKey"])
env_key = os.getenv("TAVILY_API_KEY") or os.getenv("PUBLIC_TAVILY_API_KEY")
return env_key or ""
async def _execute_tavily_web_search(
args: dict[str, Any],
tool_config: dict[str, Any] | None = None,
search_depth: str = "basic",
) -> dict[str, Any]:
query = str(args.get("query", "")).strip()
max_results = FIXED_SEARCH_MAX_RESULTS
if not query:
raise ValueError("Missing required field: query")
api_key = _resolve_tavily_api_key(tool_config)
if not api_key:
raise ValueError("Tavily API key not configured.")
payload = {
"api_key": api_key,
"query": query,
"search_depth": search_depth,
"include_answer": True,
"max_results": max_results,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post("https://api.tavily.com/search", json=payload)
response.raise_for_status()
data = response.json()
return {
"answer": data.get("answer"),
"results": [
{
"title": item.get("title"),
"url": item.get("url"),
"content": item.get("content"),
}
for item in data.get("results", []) or []
],
}
async def _execute_tavily_academic_search(
args: dict[str, Any],
tool_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
query = str(args.get("query", "")).strip()
max_results = FIXED_SEARCH_MAX_RESULTS
try:
min_score = float(args.get("min_score", 0.9))
except Exception:
min_score = 0.9
if not query:
raise ValueError("Missing required field: query")
api_key = _resolve_tavily_api_key(tool_config)
if not api_key:
raise ValueError("Tavily API key not configured.")
payload = {
"api_key": api_key,
"query": query,
"search_depth": "advanced",
"include_domains": ACADEMIC_DOMAINS,
"include_answer": True,
"max_results": max_results,
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post("https://api.tavily.com/search", json=payload)
response.raise_for_status()
data = response.json()
return {
"answer": data.get("answer"),
"results": [
{
"title": item.get("title"),
"url": item.get("url"),
"content": item.get("content"),
"score": item.get("score"),
}
for item in data.get("results", []) or []
if float(item.get("score") or 0.0) > min_score
],
"query_type": "academic",
"min_score": min_score,
}
async def _execute_url_extract(args: dict[str, Any]) -> dict[str, Any]:
urls = args.get("urls") or args.get("url") or ""
if isinstance(urls, str):
url_list = [u.strip() for u in urls.split(",") if u.strip()]
elif isinstance(urls, list):
url_list = [str(u).strip() for u in urls if str(u).strip()]
else:
url_list = []
if not url_list:
raise ValueError("Missing required field: urls")
results = []
for url in url_list:
try:
content_result = await _execute_webpage_reader({"url": url})
results.append(
{
"url": content_result.get("url") or url,
"content": content_result.get("content") or "",
"title": content_result.get("title") or "",
}
)
except Exception as exc:
results.append({"url": url, "error": str(exc)})
return {"results": results}
def is_local_tool_name(tool_name: str) -> bool:
resolved = resolve_tool_name(tool_name)
return any(t["id"] == resolved for t in CUSTOM_TOOLS)
async def execute_tool_by_name(
tool_name: str,
args: dict[str, Any],
tool_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
resolved_name = resolve_tool_name(tool_name)
if is_local_tool_name(resolved_name) or resolved_name in EXTERNAL_SEARCH_TOOL_NAMES:
timeout_sec = _tool_timeout_seconds()
try:
return await asyncio.wait_for(
execute_local_tool(resolved_name, args, tool_config),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
return {
"error": f"Tool '{resolved_name}' timed out after {timeout_sec:.1f}s",
"timed_out": True,
"tool": resolved_name,
"args": args or {},
}
except Exception as exc:
return {
"error": str(exc),
"timed_out": False,
"tool": resolved_name,
"args": args or {},
}
raise ValueError(f"Tool {resolved_name} not found")
GLOBAL_TOOLS = REGISTRY_GLOBAL_TOOLS
AGENT_TOOLS = REGISTRY_AGENT_TOOLS
ALL_TOOLS = REGISTRY_ALL_TOOLS