| """ |
| routing.py β SLM-native LLM call router with cost homeostasis. |
| |
| Routes tasks to the smallest capable model. Local-first by default. |
| Enforces cost, latency, and token budgets as hard constraints. |
| |
| Complexity classification: |
| simple β single SLM call (summarize, answer simple Q) |
| moderate β sequential chain (plan β execute) |
| complex β parallel specialists (research + code + review) |
| critical β specialists + critic ensemble + optional HITL |
| |
| Router decisions are logged and reproducible. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Any |
|
|
| from purpose_agent.llm_backend import LLMBackend |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TaskComplexity(str, Enum): |
| SIMPLE = "simple" |
| MODERATE = "moderate" |
| COMPLEX = "complex" |
| CRITICAL = "critical" |
|
|
|
|
| @dataclass |
| class RoutingPolicy: |
| """Policy governing model selection and cost control.""" |
| prefer_local: bool = True |
| max_cost_per_task_usd: float = 0.10 |
| max_latency_per_call_s: float = 30.0 |
| max_tokens_per_task: int = 10000 |
| allow_cloud_fallback: bool = True |
| fallback_model: str = "" |
| local_model: str = "ollama:qwen3:1.7b" |
| cloud_model: str = "openrouter:meta-llama/llama-3.3-70b-instruct" |
|
|
|
|
| @dataclass |
| class ModelOption: |
| """A model available for routing.""" |
| spec: str |
| is_local: bool = True |
| cost_per_1k_tokens: float = 0.0 |
| avg_latency_s: float = 1.0 |
| max_context: int = 32768 |
| capabilities: list[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class RoutingDecision: |
| """Recorded decision from the router.""" |
| task_summary: str |
| complexity: TaskComplexity |
| selected_model: str |
| reason: str |
| timestamp: float = field(default_factory=time.time) |
| estimated_cost: float = 0.0 |
|
|
|
|
| |
| _COMPLEX_KEYWORDS = {"research", "analyze", "compare", "design", "architect", "security", "audit"} |
| _CRITICAL_KEYWORDS = {"deploy", "production", "delete", "admin", "payment", "credential", "secret"} |
| _SIMPLE_KEYWORDS = {"summarize", "translate", "hello", "what is", "define", "explain"} |
|
|
|
|
| class TaskComplexityClassifier: |
| """Classifies task complexity from the purpose description.""" |
|
|
| def classify(self, purpose: str) -> TaskComplexity: |
| words = set(purpose.lower().split()) |
|
|
| if words & _CRITICAL_KEYWORDS: |
| return TaskComplexity.CRITICAL |
| if words & _COMPLEX_KEYWORDS: |
| return TaskComplexity.COMPLEX |
| if words & _SIMPLE_KEYWORDS: |
| return TaskComplexity.SIMPLE |
| |
| if len(purpose) > 100 or "code" in purpose.lower() or "function" in purpose.lower(): |
| return TaskComplexity.MODERATE |
| return TaskComplexity.SIMPLE |
|
|
|
|
| class ModelSelector: |
| """ |
| Selects the best model for a task given complexity and policy. |
| |
| Rules: |
| 1. Local-first (if policy.prefer_local and local model available) |
| 2. Smallest capable model (don't use 70B for "say hello") |
| 3. Respect cost/latency budgets |
| 4. Fallback to cloud only when policy allows and local fails |
| """ |
|
|
| def __init__(self, models: list[ModelOption] | None = None, policy: RoutingPolicy | None = None): |
| self.models = models or [] |
| self.policy = policy or RoutingPolicy() |
|
|
| def select(self, complexity: TaskComplexity) -> str: |
| """Select the best model spec for given complexity.""" |
| |
| candidates = list(self.models) |
|
|
| if self.policy.prefer_local: |
| local = [m for m in candidates if m.is_local] |
| if local: |
| candidates = local |
|
|
| |
| if complexity == TaskComplexity.SIMPLE: |
| candidates.sort(key=lambda m: m.cost_per_1k_tokens) |
| if candidates: |
| return candidates[0].spec |
|
|
| |
| if complexity in (TaskComplexity.COMPLEX, TaskComplexity.CRITICAL): |
| |
| if self.policy.allow_cloud_fallback: |
| return self.policy.cloud_model |
| capable = [m for m in candidates if "reasoning" in m.capabilities or "code" in m.capabilities] |
| if capable: |
| return capable[0].spec |
|
|
| |
| return self.policy.local_model |
|
|
|
|
| class LLMCallRouter: |
| """ |
| Main router: classifies task β selects model β logs decision. |
| |
| Usage: |
| router = LLMCallRouter(policy=RoutingPolicy(prefer_local=True)) |
| model_spec = router.route("Write a fibonacci function") |
| # β "ollama:qwen3:1.7b" (local, code task, moderate complexity) |
| |
| model_spec = router.route("Audit production deployment for security vulnerabilities") |
| # β cloud model (critical task, needs strong reasoning) |
| """ |
|
|
| def __init__(self, policy: RoutingPolicy | None = None, models: list[ModelOption] | None = None): |
| self.policy = policy or RoutingPolicy() |
| self.classifier = TaskComplexityClassifier() |
| self.selector = ModelSelector(models or [], self.policy) |
| self._decisions: list[RoutingDecision] = [] |
| self._total_cost = 0.0 |
|
|
| def route(self, task: str) -> str: |
| """Route a task to the best model. Returns model spec string.""" |
| complexity = self.classifier.classify(task) |
| selected = self.selector.select(complexity) |
|
|
| |
| if self._total_cost >= self.policy.max_cost_per_task_usd: |
| |
| selected = self.policy.local_model |
| reason = "budget_exceeded: forced local" |
| else: |
| reason = f"complexity={complexity.value}" |
|
|
| decision = RoutingDecision( |
| task_summary=task[:80], |
| complexity=complexity, |
| selected_model=selected, |
| reason=reason, |
| ) |
| self._decisions.append(decision) |
| logger.info(f"Router: {complexity.value} β {selected} ({reason})") |
| return selected |
|
|
| def record_cost(self, cost_usd: float) -> None: |
| """Record cost of a completed call for budget tracking.""" |
| self._total_cost += cost_usd |
|
|
| @property |
| def total_cost(self) -> float: |
| return self._total_cost |
|
|
| @property |
| def decisions(self) -> list[RoutingDecision]: |
| return self._decisions |
|
|
| def reset_budget(self) -> None: |
| self._total_cost = 0.0 |
|
|