Janus-backend / backend /app /services /learning /learning_engine.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Core learning engine that coordinates all learning activities.
Orchestrates knowledge ingestion, experience learning, prompt evolution,
skill distillation, trust management, and freshness management.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from collections import Counter
from .knowledge_ingestor import KnowledgeIngestor
from .knowledge_store import KnowledgeStore
from .prompt_optimizer import PromptOptimizer
from .skill_distiller import SkillDistiller
from .trust_manager import TrustManager
logger = logging.getLogger(__name__)
class LearningEngine:
"""Coordinates all learning activities."""
def __init__(
self,
knowledge_store: KnowledgeStore,
knowledge_ingestor: KnowledgeIngestor,
prompt_optimizer: Optional[PromptOptimizer] = None,
skill_distiller: Optional[SkillDistiller] = None,
trust_manager: Optional[TrustManager] = None,
):
self.knowledge_store = knowledge_store
self.knowledge_ingestor = knowledge_ingestor
self.prompt_optimizer = prompt_optimizer
self.skill_distiller = skill_distiller
self.trust_manager = trust_manager
self.last_run: Dict[str, str] = {}
# In-memory learning metadata (flushed periodically)
self._case_learnings: List[Dict[str, Any]] = []
self._route_stats: Counter = Counter()
self._provider_stats: Dict[str, Dict[str, int]] = {}
self._prompt_performance: Dict[str, Dict[str, Any]] = {}
# ── Knowledge Ingestion ──────────────────────────────────────────────────
async def run_knowledge_ingestion(self, topics: list[str]) -> Dict[str, Any]:
"""
Run knowledge ingestion for specified topics.
Args:
topics: List of topics to ingest knowledge about
Returns:
Ingestion results
"""
logger.info(f"Running knowledge ingestion for topics: {topics}")
total_items = 0
results = []
for topic in topics:
search_items = await self.knowledge_ingestor.ingest_from_search(topic)
for item in search_items:
self.knowledge_store.save_knowledge(item)
total_items += 1
news_items = await self.knowledge_ingestor.ingest_from_news(topic)
for item in news_items:
self.knowledge_store.save_knowledge(item)
total_items += 1
results.append({
"topic": topic,
"search_items": len(search_items),
"news_items": len(news_items),
})
self.last_run["knowledge_ingestion"] = datetime.utcnow().isoformat()
logger.info(f"Knowledge ingestion complete: {total_items} items ingested")
return {
"total_items": total_items,
"results": results,
"timestamp": self.last_run["knowledge_ingestion"],
}
async def run_cleanup(self, expiration_days: int = 30) -> Dict[str, Any]:
"""Run cleanup of expired knowledge."""
logger.info("Running knowledge cleanup")
deleted_count = self.knowledge_store.delete_expired_knowledge(expiration_days)
self.last_run["cleanup"] = datetime.utcnow().isoformat()
return {
"deleted_count": deleted_count,
"timestamp": self.last_run["cleanup"],
}
# ── Experience Learning (Task 34) ────────────────────────────────────────
def learn_from_case(self, case_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract learning metadata from a completed case execution.
Args:
case_data: Complete case payload
Returns:
Learning metadata extracted
"""
case_id = case_data.get("case_id", "unknown")
logger.info(f"Learning from case {case_id}")
route = case_data.get("route", {})
outputs = case_data.get("outputs") or self._derive_outputs(case_data)
# 1. Track route effectiveness
route_key = f"{route.get('domain_pack', 'general')}:{route.get('execution_mode', 'standard')}"
self._route_stats[route_key] += 1
# 2. Track which agents produced useful output
agents_used = []
agent_quality = {}
for output in outputs:
if isinstance(output, dict):
agent_name = output.get("agent", "unknown")
summary = output.get("summary", "")
confidence = output.get("confidence", 0.0)
agents_used.append(agent_name)
agent_quality[agent_name] = {
"output_length": len(summary),
"confidence": confidence,
"produced_output": len(summary) > 10,
}
# 3. Build learning record
learning = {
"case_id": case_id,
"route": route,
"agents_used": agents_used,
"agent_quality": agent_quality,
"domain": route.get("domain_pack", "general"),
"complexity": route.get("complexity", "medium"),
"execution_mode": route.get("execution_mode", "standard"),
"learned_at": datetime.utcnow().isoformat(),
}
self._case_learnings.append(learning)
# Keep only last 500 learnings in memory
if len(self._case_learnings) > 500:
self._case_learnings = self._case_learnings[-500:]
# 4. Update trust scores for sources mentioned in research
if self.trust_manager:
for output in outputs:
if isinstance(output, dict) and output.get("agent") == "research":
sources = output.get("details", {}).get("sources", [])
for source in sources:
if isinstance(source, str):
self.trust_manager.update_trust(source, True, weight=0.5)
logger.info(f"Learned from case {case_id}: route={route_key}, agents={agents_used}")
return learning
def _derive_outputs(self, case_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Backfill agent outputs from the current case shape."""
outputs: List[Dict[str, Any]] = []
def _append(agent: str, details: Dict[str, Any]) -> None:
if not isinstance(details, dict) or not details:
return
summary = (
details.get("summary")
or details.get("response")
or details.get("estimated_output")
or ""
)
outputs.append(
{
"agent": agent,
"summary": str(summary),
"confidence": float(details.get("confidence", 0.0) or 0.0),
"details": details,
}
)
_append("research", case_data.get("research", {}))
_append("planner", case_data.get("planner", {}))
_append("verifier", case_data.get("verifier", {}))
_append("synthesizer", case_data.get("final", {}))
return outputs
def detect_patterns(self, min_frequency: int = 3) -> List[Dict[str, Any]]:
"""
Detect patterns in recent case executions.
Args:
min_frequency: Minimum occurrences to count as a pattern
Returns:
List of detected patterns
"""
if len(self._case_learnings) < min_frequency:
return []
patterns = []
# Pattern 1: Domain frequency
domain_counts = Counter(l["domain"] for l in self._case_learnings)
for domain, count in domain_counts.items():
if count >= min_frequency:
patterns.append({
"type": "domain_frequency",
"domain": domain,
"count": count,
"percentage": count / len(self._case_learnings) * 100,
})
# Pattern 2: Execution mode frequency
mode_counts = Counter(l["execution_mode"] for l in self._case_learnings)
for mode, count in mode_counts.items():
if count >= min_frequency:
patterns.append({
"type": "execution_mode_frequency",
"mode": mode,
"count": count,
"percentage": count / len(self._case_learnings) * 100,
})
# Pattern 3: Agent combinations that produce high confidence
high_confidence_combos = Counter()
for learning in self._case_learnings:
quality = learning.get("agent_quality", {})
high_conf_agents = [
a for a, q in quality.items()
if q.get("confidence", 0) > 0.7
]
if high_conf_agents:
high_confidence_combos[tuple(sorted(high_conf_agents))] += 1
for combo, count in high_confidence_combos.items():
if count >= min_frequency:
patterns.append({
"type": "high_confidence_agents",
"agents": list(combo),
"count": count,
})
self.last_run["pattern_detection"] = datetime.utcnow().isoformat()
logger.info(f"Detected {len(patterns)} patterns from {len(self._case_learnings)} cases")
return patterns
def get_route_effectiveness(self) -> Dict[str, Any]:
"""Get route effectiveness insights."""
total = sum(self._route_stats.values())
if total == 0:
return {"total_cases": 0, "routes": {}}
return {
"total_cases": total,
"routes": {
route: {
"count": count,
"percentage": round(count / total * 100, 1),
}
for route, count in self._route_stats.most_common()
},
}
def get_prompt_performance(self) -> Dict[str, Any]:
"""Get prompt performance insights from case learnings."""
if not self._case_learnings:
return {"total_cases": 0, "agents": {}}
agent_stats: Dict[str, Dict[str, Any]] = {}
for learning in self._case_learnings:
for agent, quality in learning.get("agent_quality", {}).items():
if agent not in agent_stats:
agent_stats[agent] = {
"total_runs": 0,
"total_confidence": 0.0,
"produced_output_count": 0,
}
agent_stats[agent]["total_runs"] += 1
agent_stats[agent]["total_confidence"] += quality.get("confidence", 0)
if quality.get("produced_output", False):
agent_stats[agent]["produced_output_count"] += 1
# Calculate averages
for agent, stats in agent_stats.items():
runs = stats["total_runs"]
stats["avg_confidence"] = round(stats["total_confidence"] / runs, 3) if runs > 0 else 0
stats["output_rate"] = round(stats["produced_output_count"] / runs, 3) if runs > 0 else 0
return {
"total_cases": len(self._case_learnings),
"agents": agent_stats,
}
# ── Prompt Evolution (Task 35) ───────────────────────────────────────────
async def run_prompt_optimization(self, prompt_names: List[str]) -> Dict[str, Any]:
"""
Run prompt optimization for specified prompts.
Uses prompt_optimizer to create improved variants based on
prompt performance data.
"""
if not self.prompt_optimizer:
return {"status": "skipped", "reason": "prompt_optimizer not configured"}
results = []
performance = self.get_prompt_performance()
for name in prompt_names:
agent_perf = performance.get("agents", {}).get(name, {})
avg_conf = agent_perf.get("avg_confidence", 0.5)
# Only optimize prompts with low average confidence
if avg_conf > 0.8:
results.append({"prompt": name, "status": "skipped", "reason": "already high performance"})
continue
try:
from app.services.prompt_store import get_prompt
prompt_data = get_prompt(name)
if not prompt_data:
continue
goal = f"Improve output quality (current avg confidence: {avg_conf:.2f})"
variant = await self.prompt_optimizer.create_prompt_variant(
name, prompt_data["content"], goal
)
results.append({"prompt": name, "status": "variant_created", "variant_id": variant["id"]})
except Exception as e:
logger.error(f"Failed to optimize prompt {name}: {e}")
results.append({"prompt": name, "status": "error", "error": str(e)})
self.last_run["prompt_optimization"] = datetime.utcnow().isoformat()
return {"results": results, "timestamp": self.last_run["prompt_optimization"]}
def get_active_prompt(self, prompt_name: str) -> Optional[str]:
"""
Get the active production prompt text, if one has been promoted.
Args:
prompt_name: Prompt name (e.g., "research", "verifier")
Returns:
Production prompt text, or None if no production version exists
"""
if not self.prompt_optimizer:
return None
production = self.prompt_optimizer._get_production_variant(prompt_name)
if production:
return production.get("prompt_text")
return None
# ── Skill Distillation (Task 36) ─────────────────────────────────────────
async def run_skill_distillation(self, min_frequency: int = 3) -> Dict[str, Any]:
"""
Run skill distillation from recent case patterns.
"""
if not self.skill_distiller:
return {"status": "skipped", "reason": "skill_distiller not configured"}
# Use in-memory case learnings as source data
candidates = self.skill_distiller.detect_skill_candidates(
self._case_learnings, min_frequency=min_frequency
)
skills_created = []
for candidate in candidates[:5]:
example_cases = [
l for l in self._case_learnings
if l.get("domain") == candidate.get("domain")
][:3]
try:
skill = await self.skill_distiller.distill_skill(candidate, example_cases)
skills_created.append(skill)
except Exception as e:
logger.error(f"Failed to distill skill: {e}")
self.last_run["skill_distillation"] = datetime.utcnow().isoformat()
return {
"candidates_found": len(candidates),
"skills_created": len(skills_created),
"skills": skills_created,
"timestamp": self.last_run["skill_distillation"],
}
# ── Trust & Freshness (Task 37) ──────────────────────────────────────────
async def run_freshness_refresh(self) -> Dict[str, Any]:
"""
Check freshness of all knowledge items and flag stale ones.
"""
if not self.trust_manager:
return {"status": "skipped", "reason": "trust_manager not configured"}
all_items = self.knowledge_store.list_all()
stale = self.trust_manager.get_stale_items(all_items, threshold=0.3)
recommendations = self.trust_manager.recommend_refresh(stale, max_recommendations=10)
# Update freshness scores
for item in all_items:
freshness = self.trust_manager.calculate_freshness(item)
item_id = item.get("id")
if item_id:
self.trust_manager.update_freshness(item_id, freshness)
self.last_run["freshness_refresh"] = datetime.utcnow().isoformat()
return {
"total_items": len(all_items),
"stale_items": len(stale),
"refresh_recommendations": len(recommendations),
"recommendations": recommendations,
"timestamp": self.last_run["freshness_refresh"],
}
# ── Status & Insights ────────────────────────────────────────────────────
def get_status(self) -> Dict[str, Any]:
"""Get learning engine status."""
storage_stats = self.knowledge_store.get_storage_stats()
return {
"storage": storage_stats,
"last_run": self.last_run,
"enabled": True,
"cases_learned": len(self._case_learnings),
"components": {
"knowledge_store": True,
"knowledge_ingestor": True,
"prompt_optimizer": self.prompt_optimizer is not None,
"skill_distiller": self.skill_distiller is not None,
"trust_manager": self.trust_manager is not None,
},
}
def get_insights(self) -> Dict[str, Any]:
"""Get comprehensive learning insights."""
recent_items = self.knowledge_store.list_all(limit=10)
return {
"recent_knowledge": [
{
"id": item.get("id"),
"title": item.get("title"),
"source": item.get("source"),
"saved_at": item.get("saved_at"),
}
for item in recent_items
],
"storage_stats": self.knowledge_store.get_storage_stats(),
"route_effectiveness": self.get_route_effectiveness(),
"prompt_performance": self.get_prompt_performance(),
"patterns": self.detect_patterns(),
}