Rohan03's picture
Sprint 5: routing.py β€” LLMCallRouter, ModelSelector, cost homeostasis
80a4e8f verified
"""
routing.py β€” SLM-native LLM call router with cost homeostasis.
Routes tasks to the smallest capable model. Local-first by default.
Enforces cost, latency, and token budgets as hard constraints.
Complexity classification:
simple β†’ single SLM call (summarize, answer simple Q)
moderate β†’ sequential chain (plan β†’ execute)
complex β†’ parallel specialists (research + code + review)
critical β†’ specialists + critic ensemble + optional HITL
Router decisions are logged and reproducible.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from purpose_agent.llm_backend import LLMBackend
logger = logging.getLogger(__name__)
class TaskComplexity(str, Enum):
SIMPLE = "simple"
MODERATE = "moderate"
COMPLEX = "complex"
CRITICAL = "critical"
@dataclass
class RoutingPolicy:
"""Policy governing model selection and cost control."""
prefer_local: bool = True
max_cost_per_task_usd: float = 0.10
max_latency_per_call_s: float = 30.0
max_tokens_per_task: int = 10000
allow_cloud_fallback: bool = True
fallback_model: str = ""
local_model: str = "ollama:qwen3:1.7b"
cloud_model: str = "openrouter:meta-llama/llama-3.3-70b-instruct"
@dataclass
class ModelOption:
"""A model available for routing."""
spec: str # e.g. "ollama:qwen3:1.7b"
is_local: bool = True
cost_per_1k_tokens: float = 0.0 # $0 for local
avg_latency_s: float = 1.0
max_context: int = 32768
capabilities: list[str] = field(default_factory=list) # ["code","reasoning","general"]
@dataclass
class RoutingDecision:
"""Recorded decision from the router."""
task_summary: str
complexity: TaskComplexity
selected_model: str
reason: str
timestamp: float = field(default_factory=time.time)
estimated_cost: float = 0.0
# Keyword-based complexity heuristics
_COMPLEX_KEYWORDS = {"research", "analyze", "compare", "design", "architect", "security", "audit"}
_CRITICAL_KEYWORDS = {"deploy", "production", "delete", "admin", "payment", "credential", "secret"}
_SIMPLE_KEYWORDS = {"summarize", "translate", "hello", "what is", "define", "explain"}
class TaskComplexityClassifier:
"""Classifies task complexity from the purpose description."""
def classify(self, purpose: str) -> TaskComplexity:
words = set(purpose.lower().split())
if words & _CRITICAL_KEYWORDS:
return TaskComplexity.CRITICAL
if words & _COMPLEX_KEYWORDS:
return TaskComplexity.COMPLEX
if words & _SIMPLE_KEYWORDS:
return TaskComplexity.SIMPLE
# Default: moderate for anything with multiple sentences or code-related
if len(purpose) > 100 or "code" in purpose.lower() or "function" in purpose.lower():
return TaskComplexity.MODERATE
return TaskComplexity.SIMPLE
class ModelSelector:
"""
Selects the best model for a task given complexity and policy.
Rules:
1. Local-first (if policy.prefer_local and local model available)
2. Smallest capable model (don't use 70B for "say hello")
3. Respect cost/latency budgets
4. Fallback to cloud only when policy allows and local fails
"""
def __init__(self, models: list[ModelOption] | None = None, policy: RoutingPolicy | None = None):
self.models = models or []
self.policy = policy or RoutingPolicy()
def select(self, complexity: TaskComplexity) -> str:
"""Select the best model spec for given complexity."""
# Filter by policy
candidates = list(self.models)
if self.policy.prefer_local:
local = [m for m in candidates if m.is_local]
if local:
candidates = local
# For simple tasks, prefer smallest/cheapest
if complexity == TaskComplexity.SIMPLE:
candidates.sort(key=lambda m: m.cost_per_1k_tokens)
if candidates:
return candidates[0].spec
# For complex/critical, prefer most capable
if complexity in (TaskComplexity.COMPLEX, TaskComplexity.CRITICAL):
# Prefer cloud models with more capability
if self.policy.allow_cloud_fallback:
return self.policy.cloud_model
capable = [m for m in candidates if "reasoning" in m.capabilities or "code" in m.capabilities]
if capable:
return capable[0].spec
# Default: local model
return self.policy.local_model
class LLMCallRouter:
"""
Main router: classifies task β†’ selects model β†’ logs decision.
Usage:
router = LLMCallRouter(policy=RoutingPolicy(prefer_local=True))
model_spec = router.route("Write a fibonacci function")
# β†’ "ollama:qwen3:1.7b" (local, code task, moderate complexity)
model_spec = router.route("Audit production deployment for security vulnerabilities")
# β†’ cloud model (critical task, needs strong reasoning)
"""
def __init__(self, policy: RoutingPolicy | None = None, models: list[ModelOption] | None = None):
self.policy = policy or RoutingPolicy()
self.classifier = TaskComplexityClassifier()
self.selector = ModelSelector(models or [], self.policy)
self._decisions: list[RoutingDecision] = []
self._total_cost = 0.0
def route(self, task: str) -> str:
"""Route a task to the best model. Returns model spec string."""
complexity = self.classifier.classify(task)
selected = self.selector.select(complexity)
# Budget check
if self._total_cost >= self.policy.max_cost_per_task_usd:
# Over budget: force local
selected = self.policy.local_model
reason = "budget_exceeded: forced local"
else:
reason = f"complexity={complexity.value}"
decision = RoutingDecision(
task_summary=task[:80],
complexity=complexity,
selected_model=selected,
reason=reason,
)
self._decisions.append(decision)
logger.info(f"Router: {complexity.value} β†’ {selected} ({reason})")
return selected
def record_cost(self, cost_usd: float) -> None:
"""Record cost of a completed call for budget tracking."""
self._total_cost += cost_usd
@property
def total_cost(self) -> float:
return self._total_cost
@property
def decisions(self) -> list[RoutingDecision]:
return self._decisions
def reset_budget(self) -> None:
self._total_cost = 0.0