Spaces:
Running
Running
| """SkillRegistry β discover, load, match, and inject skills. | |
| Skills follow the official SKILL.md format: | |
| - YAML frontmatter with only ``name`` and ``description`` | |
| - Markdown body with instructions (loaded only after selection) | |
| Skills are discovered from user-configured directories and matched to | |
| tasks via LLM-based selection (with keyword fallback). | |
| Skill identity: | |
| Every skill directory may contain a ``.skill_id`` sidecar file that | |
| stores the persistent unique identifier. On **first discovery** | |
| (no ``.skill_id`` file present), an ID is generated and written to | |
| the file. On subsequent runs the ID is **read** from the file β | |
| this makes the ID portable (survives directory moves, machine changes) | |
| and deterministic (never regenerated). | |
| Imported skills: ``{name}__imp_{uuid_hex[:8]}`` | |
| Evolved skills: ``{name}__v{gen}_{uuid_hex[:8]}`` (written by evolver) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, TYPE_CHECKING | |
| from openspace.utils.logging import Logger | |
| from .skill_utils import parse_frontmatter, strip_frontmatter, check_skill_safety, is_skill_safe | |
| from .skill_ranker import SkillRanker, SkillCandidate, PREFILTER_THRESHOLD | |
| if TYPE_CHECKING: | |
| from openspace.llm import LLMClient | |
| logger = Logger.get_logger(__name__) | |
| # Sidecar filename that stores the persistent skill_id | |
| SKILL_ID_FILENAME = ".skill_id" | |
| def _read_or_create_skill_id(name: str, skill_dir: Path) -> str: | |
| """Read ``skill_id`` from ``.skill_id`` sidecar, or create one. | |
| The sidecar file is a single-line plain-text file containing only | |
| the ``skill_id`` string. It lives alongside ``SKILL.md`` inside | |
| the skill directory. | |
| First call (no file): generates ``{name}__imp_{uuid8}`` and writes it. | |
| Subsequent calls: reads and returns the existing ID. | |
| """ | |
| id_file = skill_dir / SKILL_ID_FILENAME | |
| if id_file.exists(): | |
| try: | |
| existing = id_file.read_text(encoding="utf-8").strip() | |
| if existing: | |
| return existing | |
| except OSError: | |
| pass # fall through to generate | |
| # Generate a new ID and persist | |
| new_id = f"{name}__imp_{uuid.uuid4().hex[:8]}" | |
| try: | |
| id_file.write_text(new_id + "\n", encoding="utf-8") | |
| logger.debug(f"Created .skill_id for '{name}': {new_id}") | |
| except OSError as e: | |
| logger.warning(f"Cannot write {id_file}: {e} β ID will not persist across restarts") | |
| return new_id | |
| def write_skill_id(skill_dir: Path, skill_id: str) -> None: | |
| """Write (or overwrite) the ``.skill_id`` sidecar in *skill_dir*. | |
| Called by ``SkillEvolver`` after FIX / DERIVED / CAPTURED to stamp | |
| the new ``skill_id`` into the skill directory so that the next | |
| ``discover()`` picks it up correctly. | |
| """ | |
| id_file = skill_dir / SKILL_ID_FILENAME | |
| try: | |
| id_file.write_text(skill_id + "\n", encoding="utf-8") | |
| except OSError as e: | |
| logger.warning(f"Cannot write {id_file}: {e}") | |
| class SkillMeta: | |
| """Metadata for a discovered skill. | |
| ``skill_id`` is the globally unique identifier used throughout the | |
| system β LLM prompts, database, evolution, and selection all | |
| reference this field. | |
| """ | |
| skill_id: str # Unique β persisted in .skill_id sidecar | |
| name: str # Human-readable name (from frontmatter or dirname) | |
| description: str | |
| path: Path # Absolute path to SKILL.md | |
| class SkillRegistry: | |
| """Discover, load, select, and inject skills into agent context. | |
| Args: | |
| skill_dirs: Ordered list of directories to scan. Earlier entries have higher | |
| priority β a skill in the first dir shadows one with the same name | |
| in later dirs. | |
| All internal maps are keyed by ``skill_id``, not ``name``. | |
| """ | |
| def __init__(self, skill_dirs: Optional[List[Path]] = None) -> None: | |
| self._skill_dirs: List[Path] = skill_dirs or [] | |
| self._skills: Dict[str, SkillMeta] = {} # skill_id -> SkillMeta | |
| self._content_cache: Dict[str, str] = {} # skill_id -> raw SKILL.md content | |
| self._discovered = False | |
| self._ranker: Optional[SkillRanker] = None # lazy-init on first use | |
| def discover(self) -> List[SkillMeta]: | |
| """Scan all skill_dirs and populate the registry. | |
| Each skill is a sub-directory containing a ``SKILL.md`` file. | |
| The ``skill_id`` is read from the ``.skill_id`` sidecar (created | |
| automatically on first discovery). Two skills with the same | |
| ``name`` in different directories get different IDs and can | |
| coexist in the registry and database. | |
| """ | |
| self._skills.clear() | |
| self._content_cache.clear() | |
| for skill_dir in self._skill_dirs: | |
| if not skill_dir.exists(): | |
| logger.debug(f"Skill dir does not exist, skipping: {skill_dir}") | |
| continue | |
| for entry in sorted(skill_dir.iterdir()): | |
| if not entry.is_dir(): | |
| continue | |
| skill_file = entry / "SKILL.md" | |
| if not skill_file.exists(): | |
| continue | |
| try: | |
| content = skill_file.read_text(encoding="utf-8") | |
| # Safety check on skill content | |
| safety_flags = check_skill_safety(content) | |
| if not is_skill_safe(safety_flags): | |
| logger.warning( | |
| f"BLOCKED skill {entry.name}: " | |
| f"safety flags {safety_flags}" | |
| ) | |
| continue | |
| meta = self._parse_skill(entry.name, entry, skill_file, content) | |
| sid = meta.skill_id | |
| if sid in self._skills: | |
| logger.debug(f"Skill '{sid}' already discovered, skipping {skill_file}") | |
| continue | |
| self._skills[sid] = meta | |
| self._content_cache[sid] = content | |
| if safety_flags: | |
| logger.debug(f"Discovered skill: {sid} (safety: {safety_flags})") | |
| else: | |
| logger.debug(f"Discovered skill: {sid} β {meta.description[:60]}") | |
| except Exception as e: | |
| logger.warning(f"Failed to parse skill {skill_file}: {e}") | |
| self._discovered = True | |
| logger.info( | |
| f"Skill discovery complete: {len(self._skills)} skill(s) " | |
| f"from {len(self._skill_dirs)} dir(s)" | |
| ) | |
| return list(self._skills.values()) | |
| def list_skills(self) -> List[SkillMeta]: | |
| """List all discovered skills.""" | |
| self._ensure_discovered() | |
| return list(self._skills.values()) | |
| def get_skill(self, skill_id: str) -> Optional[SkillMeta]: | |
| """Get a skill by ``skill_id``.""" | |
| self._ensure_discovered() | |
| return self._skills.get(skill_id) | |
| def get_skill_by_name(self, name: str) -> Optional[SkillMeta]: | |
| """Get a skill by ``name`` (first match). Use ``get_skill`` when possible.""" | |
| self._ensure_discovered() | |
| for meta in self._skills.values(): | |
| if meta.name == name: | |
| return meta | |
| return None | |
| def update_skill(self, old_skill_id: str, new_meta: SkillMeta) -> None: | |
| """Replace a skill entry after FIX evolution. | |
| Removes *old_skill_id* from the registry and inserts *new_meta* | |
| under its (new) ``skill_id``. Content cache is refreshed from | |
| the filesystem. | |
| """ | |
| self._skills.pop(old_skill_id, None) | |
| self._content_cache.pop(old_skill_id, None) | |
| self._skills[new_meta.skill_id] = new_meta | |
| if new_meta.path.exists(): | |
| try: | |
| self._content_cache[new_meta.skill_id] = ( | |
| new_meta.path.read_text(encoding="utf-8") | |
| ) | |
| except Exception: | |
| pass | |
| logger.debug( | |
| f"Registry.update_skill: {old_skill_id} β {new_meta.skill_id}" | |
| ) | |
| def add_skill(self, meta: SkillMeta) -> None: | |
| """Register a newly-created skill (DERIVED / CAPTURED). | |
| Does NOT overwrite an existing entry with the same ``skill_id``. | |
| """ | |
| if meta.skill_id in self._skills: | |
| logger.debug( | |
| f"Registry.add_skill: {meta.skill_id} already exists, skipping" | |
| ) | |
| return | |
| self._skills[meta.skill_id] = meta | |
| if meta.path.exists(): | |
| try: | |
| self._content_cache[meta.skill_id] = ( | |
| meta.path.read_text(encoding="utf-8") | |
| ) | |
| except Exception: | |
| pass | |
| logger.debug(f"Registry.add_skill: {meta.skill_id}") | |
| # Hot-reload API (add external skills at runtime) | |
| def discover_from_dirs(self, extra_dirs: List[Path]) -> List[SkillMeta]: | |
| """Discover skills from additional directories and add to the registry. | |
| Unlike :meth:`discover`, this does **NOT** clear existing skills β it | |
| only adds new ones from the given directories. Useful for hot-loading | |
| external skills (e.g. host-agent skills, newly downloaded cloud skills). | |
| Safety: applies the same ``check_skill_safety`` / ``is_skill_safe`` | |
| filtering as :meth:`discover` to prevent malicious external skills. | |
| Args: | |
| extra_dirs: Additional directories to scan. | |
| """ | |
| added: List[SkillMeta] = [] | |
| for skill_dir in extra_dirs: | |
| if not skill_dir.exists() or not skill_dir.is_dir(): | |
| logger.debug(f"discover_from_dirs: skipping {skill_dir}") | |
| continue | |
| for entry in sorted(skill_dir.iterdir()): | |
| if not entry.is_dir(): | |
| continue | |
| skill_file = entry / "SKILL.md" | |
| if not skill_file.exists(): | |
| continue | |
| try: | |
| content = skill_file.read_text(encoding="utf-8") | |
| # Safety check (same as discover()) | |
| safety_flags = check_skill_safety(content) | |
| if not is_skill_safe(safety_flags): | |
| logger.warning( | |
| f"BLOCKED external skill {entry.name}: " | |
| f"safety flags {safety_flags}" | |
| ) | |
| continue | |
| meta = self._parse_skill(entry.name, entry, skill_file, content) | |
| if meta.skill_id in self._skills: | |
| continue | |
| self._skills[meta.skill_id] = meta | |
| self._content_cache[meta.skill_id] = content | |
| added.append(meta) | |
| logger.debug(f"Hot-registered: {meta.skill_id} β {meta.description[:60]}") | |
| except Exception as e: | |
| logger.warning(f"Failed to parse skill {skill_file}: {e}") | |
| if added: | |
| logger.info( | |
| f"discover_from_dirs: {len(added)} new skill(s) from " | |
| f"{len(extra_dirs)} dir(s)" | |
| ) | |
| return added | |
| def register_skill_dir(self, skill_dir: Path) -> Optional[SkillMeta]: | |
| """Register a single skill directory (hot-reload). | |
| Safety: applies ``check_skill_safety`` / ``is_skill_safe`` filtering. | |
| Args: | |
| skill_dir: Path to a directory containing ``SKILL.md``. | |
| Returns: | |
| :class:`SkillMeta` if newly registered, ``None`` if already | |
| present, the directory is invalid, or the skill fails safety checks. | |
| """ | |
| skill_file = skill_dir / "SKILL.md" | |
| if not skill_file.exists(): | |
| logger.debug(f"register_skill_dir: no SKILL.md in {skill_dir}") | |
| return None | |
| try: | |
| content = skill_file.read_text(encoding="utf-8") | |
| # Safety check (same as discover()) | |
| safety_flags = check_skill_safety(content) | |
| if not is_skill_safe(safety_flags): | |
| logger.warning( | |
| f"BLOCKED skill {skill_dir.name}: " | |
| f"safety flags {safety_flags}" | |
| ) | |
| return None | |
| meta = self._parse_skill(skill_dir.name, skill_dir, skill_file, content) | |
| if meta.skill_id in self._skills: | |
| logger.debug(f"register_skill_dir: {meta.skill_id} already exists") | |
| return None | |
| self._skills[meta.skill_id] = meta | |
| self._content_cache[meta.skill_id] = content | |
| logger.info(f"Hot-registered skill: {meta.skill_id}") | |
| return meta | |
| except Exception as e: | |
| logger.warning(f"Failed to register skill {skill_dir}: {e}") | |
| return None | |
| def ranker(self) -> SkillRanker: | |
| """Lazy-initialised :class:`SkillRanker` for hybrid pre-filtering.""" | |
| if self._ranker is None: | |
| self._ranker = SkillRanker() | |
| return self._ranker | |
| async def select_skills_with_llm( | |
| self, | |
| task_description: str, | |
| llm_client: "LLMClient", | |
| max_skills: int = 2, | |
| model: Optional[str] = None, | |
| skill_quality: Optional[Dict[str, Dict[str, Any]]] = None, | |
| ) -> tuple[List[SkillMeta], Optional[Dict[str, Any]]]: | |
| """Use an LLM to select the most relevant skills. | |
| When the local registry has more than ``PREFILTER_THRESHOLD`` skills, | |
| a **BM25 β embedding** pre-filter narrows the candidate set before | |
| sending to the LLM. This avoids stuffing an overly long catalog | |
| into the prompt. | |
| Progressive disclosure: the LLM only sees skill *headers* | |
| (skill_id + description + quality stats), not the full SKILL.md | |
| content. Full content is loaded only after selection. | |
| Args: | |
| task_description: The user's task instruction. | |
| llm_client: An initialised LLMClient used for the selection call. | |
| max_skills: Maximum number of skills to inject. | |
| model: Override model for this selection call. | |
| If None, falls back to ``llm_client``'s default model. | |
| skill_quality: Optional mapping ``{skill_id: {total_applied, total_completions, total_fallbacks}}`` | |
| from :class:`SkillStore`. When provided, skills with high | |
| fallback rates are filtered out and quality signals are | |
| included in the LLM selection prompt. | |
| Returns: | |
| tuple[list[SkillMeta], dict | None]: (selected_skills, selection_record). | |
| selection_record contains the LLM conversation for logging. | |
| """ | |
| self._ensure_discovered() | |
| if not task_description: | |
| return [], None | |
| available = list(self._skills.values()) | |
| if not available: | |
| return [], None | |
| # Quality-based filtering: remove skills that consistently fail | |
| filtered_out: List[str] = [] | |
| if skill_quality: | |
| kept: List[SkillMeta] = [] | |
| for s in available: | |
| q = skill_quality.get(s.skill_id) | |
| if q: | |
| selections = q.get("total_selections", 0) | |
| applied = q.get("total_applied", 0) | |
| completions = q.get("total_completions", 0) | |
| fallbacks = q.get("total_fallbacks", 0) | |
| # Filter 1: selected multiple times but never completed | |
| if selections >= 2 and completions == 0: | |
| filtered_out.append(s.skill_id) | |
| continue | |
| # Filter 2: high fallback rate when applied | |
| if applied >= 2 and fallbacks / applied > 0.5: | |
| filtered_out.append(s.skill_id) | |
| continue | |
| kept.append(s) | |
| if filtered_out: | |
| logger.info( | |
| f"Skill quality filter: removed {len(filtered_out)} " | |
| f"high-fallback skill(s): {filtered_out}" | |
| ) | |
| available = kept | |
| if not available: | |
| return [], None | |
| # Pre-filter when skill count exceeds threshold | |
| prefilter_used = False | |
| if len(available) > PREFILTER_THRESHOLD: | |
| available = self._prefilter_skills(task_description, available, max_skills) | |
| prefilter_used = True | |
| # Build a concise skills catalogue for the LLM (skill_id + description + quality) | |
| catalog_lines: List[str] = [] | |
| for s in available: | |
| q = skill_quality.get(s.skill_id) if skill_quality else None | |
| if q: | |
| selections = q.get("total_selections", 0) | |
| applied = q.get("total_applied", 0) | |
| completions = q.get("total_completions", 0) | |
| if applied > 0: | |
| rate = completions / applied | |
| catalog_lines.append( | |
| f"- **{s.skill_id}**: {s.description} " | |
| f"(success {completions}/{applied} = {rate:.0%})" | |
| ) | |
| elif selections > 0: | |
| catalog_lines.append( | |
| f"- **{s.skill_id}**: {s.description} " | |
| f"(selected {selections}x, never succeeded)" | |
| ) | |
| else: | |
| catalog_lines.append(f"- **{s.skill_id}**: {s.description} (new)") | |
| else: | |
| catalog_lines.append(f"- **{s.skill_id}**: {s.description}") | |
| skills_catalog = "\n".join(catalog_lines) | |
| prompt = self._build_skill_selection_prompt( | |
| task_description, skills_catalog, max_skills | |
| ) | |
| selection_record: Dict[str, Any] = { | |
| "method": "llm", | |
| "task": task_description[:500], | |
| "available_skills": [s.skill_id for s in available], | |
| "filtered_out": filtered_out, | |
| "prefilter_used": prefilter_used, | |
| "prompt": prompt, | |
| } | |
| try: | |
| from gdpval_bench.token_tracker import set_call_source, reset_call_source | |
| _src_tok = set_call_source("skill_select") | |
| except ImportError: | |
| _src_tok = None | |
| try: | |
| llm_kwargs = {} | |
| if model: | |
| llm_kwargs["model"] = model | |
| resp = await llm_client.complete(prompt, **llm_kwargs) | |
| content = resp["message"]["content"].strip() | |
| selected_ids, brief_plan = self._parse_skill_selection_response(content) | |
| selection_record["llm_response"] = content | |
| selection_record["parsed_ids"] = selected_ids | |
| selection_record["brief_plan"] = brief_plan | |
| # Validate ids against registry & cap | |
| result: List[SkillMeta] = [] | |
| for sid in selected_ids: | |
| if len(result) >= max_skills: | |
| break | |
| meta = self._skills.get(sid) | |
| if meta: | |
| result.append(meta) | |
| else: | |
| logger.debug(f"LLM selected unknown skill_id: {sid}") | |
| selection_record["selected"] = [s.skill_id for s in result] | |
| if result: | |
| ids = ", ".join(s.skill_id for s in result) | |
| logger.info(f"LLM skill selection: [{ids}]") | |
| else: | |
| logger.info("LLM decided no skills are relevant for this task") | |
| return result, selection_record | |
| except Exception as e: | |
| logger.warning(f"LLM skill selection failed: {e} β proceeding without skills") | |
| selection_record["error"] = str(e) | |
| selection_record["method"] = "llm_failed" | |
| selection_record["selected"] = [] | |
| return [], selection_record | |
| finally: | |
| if _src_tok is not None: | |
| reset_call_source(_src_tok) | |
| def _prefilter_skills( | |
| self, | |
| task: str, | |
| available: List[SkillMeta], | |
| max_skills: int, | |
| ) -> List[SkillMeta]: | |
| """Narrow the candidate set using BM25 + embedding hybrid ranking. | |
| Keeps at most ``max(15, max_skills * 5)`` candidates for the LLM | |
| selection prompt. | |
| """ | |
| prefilter_top_k = max(15, max_skills * 5) | |
| # Build SkillCandidate list | |
| candidates: List[SkillCandidate] = [] | |
| for s in available: | |
| body = "" | |
| raw = self._content_cache.get(s.skill_id, "") | |
| if raw: | |
| body = strip_frontmatter(raw) | |
| candidates.append(SkillCandidate( | |
| skill_id=s.skill_id, | |
| name=s.name, | |
| description=s.description, | |
| body=body, | |
| )) | |
| ranked = self.ranker.hybrid_rank(task, candidates, top_k=prefilter_top_k) | |
| # Map back to SkillMeta | |
| ranked_ids = {c.skill_id for c in ranked} | |
| result = [s for s in available if s.skill_id in ranked_ids] | |
| if len(result) < len(available): | |
| logger.info( | |
| f"Skill pre-filter: {len(available)} β {len(result)} candidates " | |
| f"(BM25+embedding, threshold={PREFILTER_THRESHOLD})" | |
| ) | |
| return result | |
| def load_skill_content(self, skill_id: str) -> Optional[str]: | |
| """Return the SKILL.md content (with frontmatter stripped) for *skill_id*.""" | |
| self._ensure_discovered() | |
| raw = self._content_cache.get(skill_id) | |
| if raw is None: | |
| return None | |
| return self._strip_frontmatter(raw) | |
| def build_context_injection( | |
| self, | |
| skills: List[SkillMeta], | |
| backends: Optional[List[str]] = None, | |
| ) -> str: | |
| """Build a prompt fragment with the full content of *skills*. | |
| Injected as a system message into the agent's messages before the | |
| user instruction so the LLM reads skill guidance first. | |
| Args: | |
| skills: Skills to inject. | |
| backends: Active backend names (e.g. ``["shell", "mcp"]``). Used to | |
| tailor the guidance so only actually available backends are | |
| mentioned. ``None`` falls back to mentioning all backends. | |
| Key features: | |
| - Includes the skill directory path so the agent can resolve | |
| relative references to ``scripts/``, ``references/``, ``assets/``. | |
| - Replaces ``{baseDir}`` placeholders with the actual skill | |
| directory path (a convention used in some SKILL.md files). | |
| """ | |
| parts: List[str] = [] | |
| for skill in skills: | |
| content = self.load_skill_content(skill.skill_id) | |
| if content: | |
| # Resolve {baseDir} placeholder to the skill directory | |
| skill_dir = str(skill.path.parent) | |
| content = content.replace("{baseDir}", skill_dir) | |
| part = ( | |
| f"### Skill: {skill.skill_id}\n" | |
| f"**Skill directory**: `{skill_dir}`\n\n" | |
| f"{content}" | |
| ) | |
| parts.append(part) | |
| if not parts: | |
| return "" | |
| # Build a backend hint that only mentions registered backends | |
| scope = set(backends) if backends else {"gui", "shell", "mcp", "web", "system"} | |
| backend_names: List[str] = [] | |
| if "mcp" in scope: | |
| backend_names.append("MCP") | |
| if "shell" in scope: | |
| backend_names.append("shell") | |
| if "gui" in scope: | |
| backend_names.append("GUI") | |
| tool_hint = ", ".join(backend_names) if backend_names else "available" | |
| # Resource access tips β mention shell_agent only when shell is available | |
| has_shell = "shell" in scope | |
| resource_tip = ( | |
| "Use `read_file` / `list_dir` / `write_file` for file operations" | |
| + (" and `shell_agent` for running scripts" if has_shell else "") | |
| + ". Paths in skill instructions are relative to the skill " | |
| "directory listed under each skill heading.\n\n" | |
| ) | |
| header = ( | |
| "# Active Skills\n\n" | |
| "The following skills provide **domain knowledge and tested procedures** " | |
| "relevant to this task.\n\n" | |
| "**How to use skills:**\n" | |
| "- If a skill contains **step-by-step procedures or commands**, follow them β " | |
| "they are verified workflows.\n" | |
| "- If a skill provides **reference information, best practices, or tool guides**, " | |
| "use it as context to inform your decisions.\n" | |
| f"- Skills supplement your available tools β you may use **any** tool " | |
| f"({tool_hint}) alongside skill guidance. " | |
| "Choose the best tool for each sub-step.\n\n" | |
| "**Resource access**: Each skill may include bundled resources " | |
| "(scripts, references, assets) in its skill directory. " | |
| + resource_tip | |
| ) | |
| return header + "\n\n---\n\n".join(parts) | |
| def _ensure_discovered(self) -> None: | |
| if not self._discovered: | |
| self.discover() | |
| def _parse_skill( | |
| dir_name: str, | |
| skill_dir: Path, | |
| skill_file: Path, | |
| content: str, | |
| ) -> SkillMeta: | |
| """Parse a SKILL.md file into a SkillMeta. | |
| Only ``name`` and ``description`` are read from frontmatter | |
| (per the official skill format). ``skill_id`` is read from | |
| the ``.skill_id`` sidecar (created if absent). | |
| """ | |
| frontmatter = parse_frontmatter(content) | |
| name = frontmatter.get("name", dir_name) | |
| description = frontmatter.get("description", name) | |
| skill_id = _read_or_create_skill_id(name, skill_dir) | |
| return SkillMeta( | |
| skill_id=skill_id, | |
| name=name, | |
| description=description, | |
| path=skill_file, | |
| ) | |
| # Frontmatter parsing is delegated to skill_utils (single source of truth). | |
| _extract_frontmatter = staticmethod(parse_frontmatter) | |
| _strip_frontmatter = staticmethod(strip_frontmatter) | |
| def _build_skill_selection_prompt( | |
| task: str, | |
| skills_catalog: str, | |
| max_skills: int, | |
| ) -> str: | |
| """Build the prompt for LLM skill selection. | |
| Uses a plan-then-select pattern: the LLM first writes a brief | |
| execution plan, then selects skills that match the plan. | |
| """ | |
| return f"""You are a skill selector for an autonomous agent. | |
| # Task | |
| {task} | |
| # Available Skills | |
| {skills_catalog} | |
| # Instructions | |
| Follow these steps: | |
| **Step 1 β Plan**: Think about how you would accomplish this task. What are the key deliverables? What file formats are needed (PDF, DOCX, XLSX, etc.)? What tools or libraries would you use? | |
| **Step 2 β Match**: Check which skills directly teach workflows for the deliverables or file formats identified in your plan. A skill is relevant ONLY if it provides a tested procedure for a core part of your plan. Skills that only share vague topical overlap (e.g. a "PDF checklist" skill for a task that just happens to involve PDFs) add noise and should be excluded. | |
| **Step 3 β Quality check**: Among matching skills, prefer ones with higher success rates. Avoid skills marked as "never succeeded" or with very low success rates β they waste iterations and actively hurt performance. | |
| **Step 4 β Decide**: Select at most {max_skills} skill(s). If no skill closely matches your plan, you MUST return an empty list. Selecting an irrelevant or low-quality skill is **worse than selecting none** β it forces the agent down an unproductive path and wastes the entire iteration budget. When in doubt, leave it out. | |
| Return a JSON object: | |
| {{"brief_plan": "1-2 sentence plan for this task", "skills": ["skill_id_1", "skill_id_2"]}} | |
| If no skill applies: | |
| {{"brief_plan": "1-2 sentence plan", "skills": []}} | |
| IMPORTANT: Use the **exact skill_id** from the list above.""" | |
| def _parse_skill_selection_response(content: str) -> tuple[List[str], str]: | |
| """Parse the LLM response and extract selected skill IDs + plan. | |
| Returns: | |
| (skill_ids, brief_plan) | |
| """ | |
| # Handle markdown code blocks | |
| code_block = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", content, re.DOTALL) | |
| if code_block: | |
| content = code_block.group(1).strip() | |
| else: | |
| # Try to find a raw JSON object | |
| json_match = re.search(r"\{.*\}", content, re.DOTALL) | |
| if json_match: | |
| content = json_match.group() | |
| try: | |
| data = json.loads(content) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse LLM skill selection JSON: {content[:200]}") | |
| return [], "" | |
| brief_plan = data.get("brief_plan", "") | |
| if brief_plan: | |
| logger.info(f"Skill selection plan: {brief_plan}") | |
| ids = data.get("skills", []) | |
| if not isinstance(ids, list): | |
| return [], brief_plan | |
| return [str(n).strip() for n in ids if n], brief_plan | |