|
|
|
|
|
|
|
|
from typing import List, Dict, Any, Tuple, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
from ankigen_core.logging import logger |
|
|
from ankigen_core.models import Card |
|
|
from ankigen_core.llm_interface import OpenAIClientManager |
|
|
from ankigen_core.context7 import Context7Client |
|
|
|
|
|
from .generators import SubjectExpertAgent, QualityReviewAgent |
|
|
from ankigen_core.agents.config import get_config_manager |
|
|
|
|
|
|
|
|
class AgentOrchestrator: |
|
|
"""Main orchestrator for the AnkiGen agent system""" |
|
|
|
|
|
def __init__(self, client_manager: OpenAIClientManager): |
|
|
self.client_manager = client_manager |
|
|
self.openai_client = None |
|
|
|
|
|
self.subject_expert = None |
|
|
self.quality_reviewer = None |
|
|
|
|
|
async def initialize(self, api_key: str, model_overrides: Dict[str, str] = None): |
|
|
"""Initialize the agent system""" |
|
|
try: |
|
|
|
|
|
await self.client_manager.initialize_client(api_key) |
|
|
self.openai_client = self.client_manager.get_client() |
|
|
|
|
|
|
|
|
if model_overrides: |
|
|
from ankigen_core.agents.config import get_config_manager |
|
|
|
|
|
config_manager = get_config_manager() |
|
|
config_manager.update_models(model_overrides) |
|
|
logger.info(f"Applied model overrides: {model_overrides}") |
|
|
|
|
|
logger.info("Agent system initialized successfully (simplified pipeline)") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize agent system: {e}") |
|
|
raise |
|
|
|
|
|
async def generate_cards_with_agents( |
|
|
self, |
|
|
topic: str, |
|
|
subject: str = "general", |
|
|
num_cards: int = 5, |
|
|
difficulty: str = "intermediate", |
|
|
enable_quality_pipeline: bool = True, |
|
|
context: Dict[str, Any] = None, |
|
|
library_name: Optional[str] = None, |
|
|
library_topic: Optional[str] = None, |
|
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
|
"""Generate cards using the agent system""" |
|
|
start_time = datetime.now() |
|
|
|
|
|
try: |
|
|
if not self.openai_client: |
|
|
raise ValueError("Agent system not initialized") |
|
|
|
|
|
logger.info(f"Starting agent-based card generation: {topic} ({subject})") |
|
|
|
|
|
|
|
|
enhanced_context = context or {} |
|
|
library_docs = None |
|
|
|
|
|
if library_name: |
|
|
logger.info(f"Fetching library documentation for: {library_name}") |
|
|
try: |
|
|
context7_client = Context7Client() |
|
|
|
|
|
|
|
|
|
|
|
base_tokens = 8000 |
|
|
if num_cards > 40: |
|
|
token_limit = 12000 |
|
|
elif num_cards > 20: |
|
|
token_limit = 10000 |
|
|
else: |
|
|
token_limit = base_tokens |
|
|
|
|
|
|
|
|
if library_topic: |
|
|
token_limit = int( |
|
|
token_limit * 0.8 |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Fetching {token_limit} tokens of documentation" |
|
|
+ (f" for topic: {library_topic}" if library_topic else "") |
|
|
) |
|
|
|
|
|
library_docs = await context7_client.fetch_library_documentation( |
|
|
library_name, topic=library_topic, tokens=token_limit |
|
|
) |
|
|
|
|
|
if library_docs: |
|
|
enhanced_context["library_documentation"] = library_docs |
|
|
enhanced_context["library_name"] = library_name |
|
|
logger.info( |
|
|
f"Added {len(library_docs)} chars of {library_name} documentation to context" |
|
|
) |
|
|
else: |
|
|
logger.warning( |
|
|
f"Could not fetch documentation for library: {library_name}" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching library documentation: {e}") |
|
|
|
|
|
cards = await self._generation_phase( |
|
|
topic=topic, |
|
|
subject=subject, |
|
|
num_cards=num_cards, |
|
|
difficulty=difficulty, |
|
|
context=enhanced_context, |
|
|
) |
|
|
|
|
|
review_results = {} |
|
|
if enable_quality_pipeline: |
|
|
cards, review_results = await self._quality_review_phase(cards) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"generation_method": "agent_system", |
|
|
"generation_time": (datetime.now() - start_time).total_seconds(), |
|
|
"cards_generated": len(cards), |
|
|
"review_results": review_results, |
|
|
"topic": topic, |
|
|
"subject": subject, |
|
|
"difficulty": difficulty, |
|
|
"library_name": library_name if library_name else None, |
|
|
"library_docs_used": bool(library_docs), |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
f"Agent-based generation complete: {len(cards)} cards generated" |
|
|
) |
|
|
return cards, metadata |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Agent-based generation failed: {e}") |
|
|
raise |
|
|
|
|
|
async def _generation_phase( |
|
|
self, |
|
|
topic: str, |
|
|
subject: str, |
|
|
num_cards: int, |
|
|
difficulty: str, |
|
|
context: Dict[str, Any] = None, |
|
|
) -> List[Card]: |
|
|
"""Execute the card generation phase""" |
|
|
|
|
|
if not self.subject_expert or self.subject_expert.subject != subject: |
|
|
self.subject_expert = SubjectExpertAgent(self.openai_client, subject) |
|
|
|
|
|
|
|
|
if context is None: |
|
|
context = {} |
|
|
context["difficulty"] = difficulty |
|
|
|
|
|
cards = await self.subject_expert.generate_cards( |
|
|
topic=topic, num_cards=num_cards, context=context |
|
|
) |
|
|
|
|
|
logger.info(f"Generation phase complete: {len(cards)} cards generated") |
|
|
return cards |
|
|
|
|
|
async def _quality_review_phase( |
|
|
self, cards: List[Card] |
|
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
|
"""Perform a single quality-review pass with optional fixes.""" |
|
|
|
|
|
if not cards: |
|
|
return cards, {"message": "No cards to review"} |
|
|
|
|
|
logger.info(f"Performing quality review for {len(cards)} cards") |
|
|
|
|
|
if not self.quality_reviewer: |
|
|
|
|
|
subject_config = get_config_manager().get_agent_config("subject_expert") |
|
|
reviewer_model = subject_config.model if subject_config else "gpt-4.1" |
|
|
self.quality_reviewer = QualityReviewAgent( |
|
|
self.openai_client, reviewer_model |
|
|
) |
|
|
|
|
|
reviewed_cards: List[Card] = [] |
|
|
approvals: List[Dict[str, Any]] = [] |
|
|
|
|
|
for card in cards: |
|
|
reviewed_card, approved, reason = await self.quality_reviewer.review_card( |
|
|
card |
|
|
) |
|
|
if approved: |
|
|
reviewed_cards.append(reviewed_card) |
|
|
else: |
|
|
approvals.append( |
|
|
{ |
|
|
"question": card.front.question if card.front else "", |
|
|
"reason": reason, |
|
|
} |
|
|
) |
|
|
|
|
|
review_results = { |
|
|
"total_cards_reviewed": len(cards), |
|
|
"approved_cards": len(reviewed_cards), |
|
|
"rejected_cards": approvals, |
|
|
} |
|
|
|
|
|
if approvals: |
|
|
logger.warning( |
|
|
"Quality review rejected cards: %s", |
|
|
"; ".join( |
|
|
f"{entry['question'][:50]}… ({entry['reason']})" |
|
|
for entry in approvals |
|
|
), |
|
|
) |
|
|
|
|
|
return reviewed_cards, review_results |
|
|
|
|
|
def get_performance_metrics(self) -> Dict[str, Any]: |
|
|
"""Get performance metrics for the agent system""" |
|
|
|
|
|
|
|
|
return { |
|
|
"agents_enabled": True, |
|
|
} |
|
|
|
|
|
|
|
|
async def integrate_with_existing_workflow( |
|
|
client_manager: OpenAIClientManager, api_key: str, **generation_params |
|
|
) -> Tuple[List[Card], Dict[str, Any]]: |
|
|
"""Integration point for existing AnkiGen workflow""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
orchestrator = AgentOrchestrator(client_manager) |
|
|
await orchestrator.initialize(api_key) |
|
|
|
|
|
cards, metadata = await orchestrator.generate_cards_with_agents(**generation_params) |
|
|
|
|
|
return cards, metadata |
|
|
|