|
|
""" |
|
|
Enhanced Synthesis Agent with Expert Consultant Assignment |
|
|
Based on skill probability scores from Skills Identification Agent |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import json |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from datetime import datetime |
|
|
import re |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ExpertConsultantAssigner: |
|
|
""" |
|
|
Assigns expert consultant profiles based on skill probabilities |
|
|
and generates weighted expertise for response synthesis |
|
|
""" |
|
|
|
|
|
|
|
|
EXPERT_PROFILES = { |
|
|
"data_analysis": { |
|
|
"title": "Senior Data Analytics Consultant", |
|
|
"expertise": ["Statistical Analysis", "Data Visualization", "Business Intelligence", "Predictive Modeling"], |
|
|
"background": "15+ years in data science across finance, healthcare, and tech sectors", |
|
|
"style": "methodical, evidence-based, quantitative reasoning" |
|
|
}, |
|
|
"technical_programming": { |
|
|
"title": "Principal Software Engineering Consultant", |
|
|
"expertise": ["Full-Stack Development", "System Architecture", "DevOps", "Code Optimization"], |
|
|
"background": "20+ years leading technical teams at Fortune 500 companies", |
|
|
"style": "practical, solution-oriented, best practices focused" |
|
|
}, |
|
|
"project_management": { |
|
|
"title": "Strategic Project Management Consultant", |
|
|
"expertise": ["Agile/Scrum", "Risk Management", "Stakeholder Communication", "Resource Optimization"], |
|
|
"background": "12+ years managing complex enterprise projects across industries", |
|
|
"style": "structured, process-driven, outcome-focused" |
|
|
}, |
|
|
"financial_analysis": { |
|
|
"title": "Executive Financial Strategy Consultant", |
|
|
"expertise": ["Financial Modeling", "Investment Analysis", "Risk Assessment", "Corporate Finance"], |
|
|
"background": "18+ years in investment banking and corporate finance advisory", |
|
|
"style": "analytical, risk-aware, ROI-focused" |
|
|
}, |
|
|
"digital_marketing": { |
|
|
"title": "Chief Marketing Strategy Consultant", |
|
|
"expertise": ["Digital Campaign Strategy", "Customer Analytics", "Brand Development", "Growth Hacking"], |
|
|
"background": "14+ years scaling marketing for startups to enterprise clients", |
|
|
"style": "creative, data-driven, customer-centric" |
|
|
}, |
|
|
"business_consulting": { |
|
|
"title": "Senior Management Consultant", |
|
|
"expertise": ["Strategic Planning", "Organizational Development", "Process Improvement", "Change Management"], |
|
|
"background": "16+ years at top-tier consulting firms (McKinsey, BCG equivalent)", |
|
|
"style": "strategic, framework-driven, holistic thinking" |
|
|
}, |
|
|
"cybersecurity": { |
|
|
"title": "Chief Information Security Consultant", |
|
|
"expertise": ["Threat Assessment", "Security Architecture", "Compliance", "Incident Response"], |
|
|
"background": "12+ years protecting critical infrastructure across government and private sectors", |
|
|
"style": "security-first, compliance-aware, risk mitigation focused" |
|
|
}, |
|
|
"healthcare_technology": { |
|
|
"title": "Healthcare Innovation Consultant", |
|
|
"expertise": ["Health Informatics", "Telemedicine", "Medical Device Integration", "HIPAA Compliance"], |
|
|
"background": "10+ years implementing healthcare technology solutions", |
|
|
"style": "patient-centric, regulation-compliant, evidence-based" |
|
|
}, |
|
|
"educational_technology": { |
|
|
"title": "Learning Technology Strategy Consultant", |
|
|
"expertise": ["Instructional Design", "EdTech Implementation", "Learning Analytics", "Curriculum Development"], |
|
|
"background": "13+ years transforming educational experiences through technology", |
|
|
"style": "learner-focused, pedagogy-driven, accessibility-minded" |
|
|
}, |
|
|
"environmental_science": { |
|
|
"title": "Sustainability Strategy Consultant", |
|
|
"expertise": ["Environmental Impact Assessment", "Carbon Footprint Analysis", "Green Technology", "ESG Reporting"], |
|
|
"background": "11+ years driving environmental initiatives for corporations", |
|
|
"style": "sustainability-focused, data-driven, long-term thinking" |
|
|
} |
|
|
} |
|
|
|
|
|
def assign_expert_consultant(self, skill_probabilities: Dict[str, float]) -> Dict[str, Any]: |
|
|
""" |
|
|
Create ultra-expert profile combining all relevant consultants |
|
|
|
|
|
Args: |
|
|
skill_probabilities: Dict mapping skill categories to probability scores (0.0-1.0) |
|
|
|
|
|
Returns: |
|
|
Dict containing ultra-expert profile with combined expertise |
|
|
""" |
|
|
if not skill_probabilities: |
|
|
return self._get_default_consultant() |
|
|
|
|
|
|
|
|
expert_scores = {} |
|
|
total_weight = 0 |
|
|
|
|
|
for skill, probability in skill_probabilities.items(): |
|
|
if skill in self.EXPERT_PROFILES and probability >= 0.2: |
|
|
expert_scores[skill] = probability |
|
|
total_weight += probability |
|
|
|
|
|
if not expert_scores: |
|
|
return self._get_default_consultant() |
|
|
|
|
|
|
|
|
ultra_expert = self._create_ultra_expert(expert_scores, total_weight) |
|
|
|
|
|
return { |
|
|
"assigned_consultant": ultra_expert, |
|
|
"expertise_weights": expert_scores, |
|
|
"total_weight": total_weight, |
|
|
"assignment_rationale": self._generate_ultra_expert_rationale(expert_scores, total_weight) |
|
|
} |
|
|
|
|
|
def _get_default_consultant(self) -> Dict[str, Any]: |
|
|
"""Default consultant for general inquiries""" |
|
|
return { |
|
|
"assigned_consultant": { |
|
|
"primary_expertise": "business_consulting", |
|
|
"title": "Senior Management Consultant", |
|
|
"expertise": ["Strategic Planning", "Problem Solving", "Analysis", "Communication"], |
|
|
"background": "Generalist consultant with broad industry experience", |
|
|
"style": "balanced, analytical, comprehensive", |
|
|
"secondary_expertise": [], |
|
|
"confidence_score": 0.7 |
|
|
}, |
|
|
"expertise_weights": {"business_consulting": 0.7}, |
|
|
"total_weight": 0.7, |
|
|
"assignment_rationale": "Default consultant assigned for general business inquiry" |
|
|
} |
|
|
|
|
|
def _create_ultra_expert(self, expert_scores: Dict[str, float], total_weight: float) -> Dict[str, Any]: |
|
|
"""Create ultra-expert profile combining all relevant consultants""" |
|
|
|
|
|
|
|
|
sorted_skills = sorted(expert_scores.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
combined_expertise = [] |
|
|
combined_background_elements = [] |
|
|
combined_style_elements = [] |
|
|
|
|
|
for skill, weight in sorted_skills: |
|
|
if skill in self.EXPERT_PROFILES: |
|
|
profile = self.EXPERT_PROFILES[skill] |
|
|
|
|
|
|
|
|
contribution_ratio = weight / total_weight |
|
|
|
|
|
|
|
|
for expertise in profile["expertise"]: |
|
|
weighted_expertise = f"{expertise} (Weight: {contribution_ratio:.1%})" |
|
|
combined_expertise.append(weighted_expertise) |
|
|
|
|
|
|
|
|
background = profile["background"] |
|
|
combined_background_elements.append(f"{background} [{skill}]") |
|
|
|
|
|
|
|
|
style_parts = [s.strip() for s in profile["style"].split(",")] |
|
|
combined_style_elements.extend(style_parts) |
|
|
|
|
|
|
|
|
top_skills = [skill.replace("_", " ").title() for skill, _ in sorted_skills[:3]] |
|
|
ultra_title = f"Visionary Ultra-Expert: {' + '.join(top_skills)} Integration Specialist" |
|
|
|
|
|
|
|
|
total_years = sum([self._extract_years_from_background(bg) for bg in combined_background_elements]) |
|
|
ultra_background = f"{total_years}+ years combined experience across {len(sorted_skills)} domains: " + \ |
|
|
"; ".join(combined_background_elements[:3]) |
|
|
|
|
|
|
|
|
unique_styles = list(set(combined_style_elements)) |
|
|
ultra_style = ", ".join(unique_styles[:6]) |
|
|
|
|
|
return { |
|
|
"primary_expertise": "ultra_expert_integration", |
|
|
"title": ultra_title, |
|
|
"expertise": combined_expertise, |
|
|
"background": ultra_background, |
|
|
"style": ultra_style, |
|
|
"domain_integration": sorted_skills, |
|
|
"confidence_score": total_weight / len(sorted_skills), |
|
|
"ultra_expert": True, |
|
|
"expertise_count": len(sorted_skills), |
|
|
"total_experience_years": total_years |
|
|
} |
|
|
|
|
|
def _extract_years_from_background(self, background: str) -> int: |
|
|
"""Extract years of experience from background string""" |
|
|
years_match = re.search(r'(\d+)\+?\s*years?', background.lower()) |
|
|
return int(years_match.group(1)) if years_match else 10 |
|
|
|
|
|
def _generate_ultra_expert_rationale(self, expert_scores: Dict[str, float], total_weight: float) -> str: |
|
|
"""Generate explanation for ultra-expert assignment""" |
|
|
sorted_skills = sorted(expert_scores.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
rationale_parts = [ |
|
|
f"Ultra-Expert Profile combining {len(sorted_skills)} specialized domains", |
|
|
f"Total expertise weight: {total_weight:.2f} across integrated skill areas" |
|
|
] |
|
|
|
|
|
|
|
|
top_contributions = [] |
|
|
for skill, weight in sorted_skills[:3]: |
|
|
contribution = (weight / total_weight) * 100 |
|
|
top_contributions.append(f"{skill} ({weight:.1%}, {contribution:.0f}% contribution)") |
|
|
|
|
|
rationale_parts.append(f"Primary domains: {'; '.join(top_contributions)}") |
|
|
|
|
|
if len(sorted_skills) > 3: |
|
|
additional_count = len(sorted_skills) - 3 |
|
|
rationale_parts.append(f"Plus {additional_count} additional specialized areas") |
|
|
|
|
|
return " | ".join(rationale_parts) |
|
|
|
|
|
|
|
|
class EnhancedSynthesisAgent: |
|
|
""" |
|
|
Enhanced synthesis agent with expert consultant assignment |
|
|
Compatible with existing ResponseSynthesisAgent interface |
|
|
""" |
|
|
|
|
|
def __init__(self, llm_router, agent_id: str = "RESP_SYNTH_001"): |
|
|
self.llm_router = llm_router |
|
|
self.agent_id = agent_id |
|
|
self.specialization = "Multi-source information integration and coherent response generation" |
|
|
self.expert_assigner = ExpertConsultantAssigner() |
|
|
self._current_user_input = None |
|
|
|
|
|
async def execute(self, user_input: str = None, agent_outputs: List[Dict[str, Any]] = None, |
|
|
context: Dict[str, Any] = None, skills_result: Dict[str, Any] = None, |
|
|
**kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute synthesis with expert consultant assignment |
|
|
Compatible with both old interface (agent_outputs first) and new interface (user_input first) |
|
|
|
|
|
Args: |
|
|
user_input: Original user question |
|
|
agent_outputs: Results from other agents (can be first positional arg for compatibility) |
|
|
context: Conversation context |
|
|
skills_result: Output from skills identification agent |
|
|
|
|
|
Returns: |
|
|
Dict containing synthesized response and metadata |
|
|
""" |
|
|
|
|
|
|
|
|
if isinstance(user_input, list) and agent_outputs is None: |
|
|
agent_outputs = user_input |
|
|
user_input = kwargs.get('user_input', '') |
|
|
context = kwargs.get('context', context) |
|
|
skills_result = kwargs.get('skills_result', skills_result) |
|
|
|
|
|
elif user_input is None: |
|
|
user_input = kwargs.get('user_input', '') |
|
|
agent_outputs = kwargs.get('agent_outputs', agent_outputs) |
|
|
context = kwargs.get('context', context) |
|
|
skills_result = kwargs.get('skills_result', skills_result) |
|
|
|
|
|
|
|
|
if not isinstance(user_input, str): |
|
|
user_input = str(user_input) if user_input else '' |
|
|
|
|
|
|
|
|
if agent_outputs is None: |
|
|
agent_outputs = [] |
|
|
|
|
|
|
|
|
if isinstance(agent_outputs, dict): |
|
|
|
|
|
normalized_outputs = [] |
|
|
for task_name, result in agent_outputs.items(): |
|
|
if isinstance(result, dict): |
|
|
|
|
|
result_with_task = result.copy() |
|
|
result_with_task['task_name'] = task_name |
|
|
normalized_outputs.append(result_with_task) |
|
|
else: |
|
|
|
|
|
normalized_outputs.append({ |
|
|
'task_name': task_name, |
|
|
'content': str(result), |
|
|
'result': str(result) |
|
|
}) |
|
|
agent_outputs = normalized_outputs |
|
|
|
|
|
|
|
|
if not isinstance(agent_outputs, list): |
|
|
agent_outputs = [agent_outputs] if agent_outputs else [] |
|
|
|
|
|
logger.info(f"{self.agent_id} synthesizing {len(agent_outputs)} agent outputs") |
|
|
if context: |
|
|
interaction_count = len(context.get('interaction_contexts', [])) if context else 0 |
|
|
logger.info(f"{self.agent_id} context has {interaction_count} interaction contexts") |
|
|
|
|
|
|
|
|
skill_probabilities = self._extract_skill_probabilities(skills_result) |
|
|
logger.info(f"Extracted skill probabilities: {skill_probabilities}") |
|
|
|
|
|
|
|
|
consultant_assignment = self.expert_assigner.assign_expert_consultant(skill_probabilities) |
|
|
assigned_consultant = consultant_assignment["assigned_consultant"] |
|
|
logger.info(f"Assigned consultant: {assigned_consultant['title']} ({assigned_consultant.get('primary_expertise', 'N/A')})") |
|
|
|
|
|
|
|
|
expert_preamble = self._generate_expert_preamble(assigned_consultant, consultant_assignment) |
|
|
|
|
|
|
|
|
synthesis_prompt = self._build_synthesis_prompt_with_expert( |
|
|
user_input=user_input, |
|
|
context=context, |
|
|
agent_outputs=agent_outputs, |
|
|
expert_preamble=expert_preamble, |
|
|
assigned_consultant=assigned_consultant |
|
|
) |
|
|
|
|
|
logger.info(f"{self.agent_id} calling LLM for response synthesis") |
|
|
|
|
|
|
|
|
try: |
|
|
response = await self.llm_router.route_inference( |
|
|
task_type="response_synthesis", |
|
|
prompt=synthesis_prompt, |
|
|
max_tokens=2000, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
|
|
|
if not response or not isinstance(response, str) or len(response.strip()) == 0: |
|
|
logger.warning(f"{self.agent_id} LLM returned empty/invalid response, using fallback") |
|
|
return self._get_fallback_response(user_input, agent_outputs, assigned_consultant) |
|
|
|
|
|
clean_response = response.strip() |
|
|
logger.info(f"{self.agent_id} received LLM response (length: {len(clean_response)})") |
|
|
|
|
|
|
|
|
result = { |
|
|
"synthesized_response": clean_response, |
|
|
"draft_response": clean_response, |
|
|
"final_response": clean_response, |
|
|
"assigned_consultant": assigned_consultant, |
|
|
"expertise_weights": consultant_assignment["expertise_weights"], |
|
|
"assignment_rationale": consultant_assignment["assignment_rationale"], |
|
|
"source_references": self._extract_source_references(agent_outputs), |
|
|
"coherence_score": 0.90, |
|
|
"improvement_opportunities": self._identify_improvements(clean_response), |
|
|
"synthesis_method": "expert_enhanced_llm", |
|
|
"agent_id": self.agent_id, |
|
|
"synthesis_quality_metrics": self._calculate_quality_metrics({"final_response": clean_response}), |
|
|
"synthesis_metadata": { |
|
|
"agent_outputs_count": len(agent_outputs), |
|
|
"context_interactions": len(context.get('interaction_contexts', [])) if context else 0, |
|
|
"user_context_available": bool(context.get('user_context', '')) if context else False, |
|
|
"expert_enhanced": True, |
|
|
"processing_timestamp": datetime.now().isoformat() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
intent_info = self._extract_intent_info(agent_outputs) |
|
|
if intent_info: |
|
|
result["intent_alignment"] = self._check_intent_alignment(result, intent_info) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"{self.agent_id} synthesis failed: {str(e)}", exc_info=True) |
|
|
return self._get_fallback_response(user_input, agent_outputs, assigned_consultant) |
|
|
|
|
|
def _extract_skill_probabilities(self, skills_result: Dict[str, Any]) -> Dict[str, float]: |
|
|
"""Extract skill probabilities from skills identification result""" |
|
|
if not skills_result: |
|
|
return {} |
|
|
|
|
|
|
|
|
skill_classification = skills_result.get('skill_classification', {}) |
|
|
if 'skill_probabilities' in skill_classification: |
|
|
return skill_classification['skill_probabilities'] |
|
|
|
|
|
|
|
|
if 'skill_probabilities' in skills_result: |
|
|
return skills_result['skill_probabilities'] |
|
|
|
|
|
|
|
|
identified_skills = skills_result.get('identified_skills', []) |
|
|
if isinstance(identified_skills, list): |
|
|
probabilities = {} |
|
|
for skill in identified_skills: |
|
|
if isinstance(skill, dict) and 'skill' in skill and 'probability' in skill: |
|
|
|
|
|
skill_name = skill['skill'] |
|
|
probability = skill['probability'] |
|
|
probabilities[skill_name] = probability |
|
|
elif isinstance(skill, dict) and 'category' in skill: |
|
|
skill_name = skill['category'] |
|
|
probability = skill.get('probability', skill.get('confidence', 0.5)) |
|
|
probabilities[skill_name] = probability |
|
|
return probabilities |
|
|
|
|
|
return {} |
|
|
|
|
|
def _generate_expert_preamble(self, assigned_consultant: Dict[str, Any], |
|
|
consultant_assignment: Dict[str, Any]) -> str: |
|
|
"""Generate expert consultant preamble for LLM prompt""" |
|
|
|
|
|
if assigned_consultant.get('ultra_expert'): |
|
|
|
|
|
preamble = f"""You are responding as a {assigned_consultant['title']} - an unprecedented combination of industry-leading experts. |
|
|
|
|
|
ULTRA-EXPERT PROFILE: |
|
|
- Integrated Expertise: {assigned_consultant['expertise_count']} specialized domains |
|
|
- Combined Experience: {assigned_consultant['total_experience_years']}+ years across multiple industries |
|
|
- Integration Approach: Cross-domain synthesis with deep specialization |
|
|
- Response Style: {assigned_consultant['style']} |
|
|
|
|
|
DOMAIN INTEGRATION: {', '.join([f"{skill} ({weight:.1%})" for skill, weight in assigned_consultant['domain_integration']])} |
|
|
|
|
|
SPECIALIZED EXPERTISE AREAS: |
|
|
{chr(10).join([f"• {expertise}" for expertise in assigned_consultant['expertise'][:8]])} |
|
|
|
|
|
ASSIGNMENT RATIONALE: {consultant_assignment['assignment_rationale']} |
|
|
|
|
|
KNOWLEDGE DEPTH REQUIREMENT: |
|
|
- Provide insights equivalent to a visionary thought leader combining expertise from multiple domains |
|
|
- Synthesize knowledge across {assigned_consultant['expertise_count']} specialization areas |
|
|
- Apply interdisciplinary thinking and cross-domain innovation |
|
|
- Leverage combined {assigned_consultant['total_experience_years']}+ years of integrated experience |
|
|
|
|
|
ULTRA-EXPERT RESPONSE GUIDELINES: |
|
|
- Draw from extensive cross-domain experience and pattern recognition |
|
|
- Provide multi-perspective analysis combining different expert viewpoints |
|
|
- Include interdisciplinary frameworks and innovative approaches |
|
|
- Acknowledge complexity while providing actionable, synthesized recommendations |
|
|
- Balance broad visionary thinking with deep domain-specific insights |
|
|
- Use integrative problem-solving that spans multiple expertise areas |
|
|
""" |
|
|
else: |
|
|
|
|
|
preamble = f"""You are responding as a {assigned_consultant['title']} with the following profile: |
|
|
|
|
|
EXPERTISE PROFILE: |
|
|
- Primary Expertise: {assigned_consultant['primary_expertise']} |
|
|
- Core Skills: {', '.join(assigned_consultant['expertise'])} |
|
|
- Background: {assigned_consultant['background']} |
|
|
- Response Style: {assigned_consultant['style']} |
|
|
|
|
|
ASSIGNMENT RATIONALE: {consultant_assignment['assignment_rationale']} |
|
|
|
|
|
EXPERTISE WEIGHTS: {', '.join([f"{skill}: {weight:.1%}" for skill, weight in consultant_assignment['expertise_weights'].items()])} |
|
|
|
|
|
""" |
|
|
|
|
|
if assigned_consultant.get('secondary_expertise'): |
|
|
preamble += f"SECONDARY EXPERTISE: {', '.join(assigned_consultant['secondary_expertise'])}\n" |
|
|
|
|
|
preamble += f""" |
|
|
KNOWLEDGE DEPTH REQUIREMENT: Provide insights equivalent to a highly experienced, industry-leading {assigned_consultant['title']} with deep domain expertise and practical experience. |
|
|
|
|
|
RESPONSE GUIDELINES: |
|
|
- Draw from extensive practical experience in your field |
|
|
- Provide industry-specific insights and best practices |
|
|
- Include relevant frameworks, methodologies, or tools |
|
|
- Acknowledge complexity while remaining actionable |
|
|
- Balance theoretical knowledge with real-world application |
|
|
""" |
|
|
|
|
|
return preamble |
|
|
|
|
|
def _build_synthesis_prompt_with_expert(self, user_input: str, context: Dict[str, Any], |
|
|
agent_outputs: List[Dict[str, Any]], |
|
|
expert_preamble: str, |
|
|
assigned_consultant: Dict[str, Any]) -> str: |
|
|
"""Build synthesis prompt with expert consultant context""" |
|
|
|
|
|
|
|
|
context_section = self._build_context_section(context) |
|
|
|
|
|
|
|
|
agent_outputs_section = "" |
|
|
if agent_outputs: |
|
|
|
|
|
if isinstance(agent_outputs, dict): |
|
|
|
|
|
outputs_list = [] |
|
|
for task_name, result in agent_outputs.items(): |
|
|
if isinstance(result, dict): |
|
|
outputs_list.append(result) |
|
|
else: |
|
|
|
|
|
outputs_list.append({ |
|
|
'task': task_name, |
|
|
'content': str(result), |
|
|
'result': str(result) |
|
|
}) |
|
|
agent_outputs = outputs_list |
|
|
|
|
|
|
|
|
if isinstance(agent_outputs, list): |
|
|
agent_outputs_section = f"\n\nAgent Analysis Results:\n" |
|
|
for i, output in enumerate(agent_outputs, 1): |
|
|
|
|
|
if isinstance(output, dict): |
|
|
output_text = output.get('content') or output.get('result') or output.get('final_response') or str(output) |
|
|
else: |
|
|
|
|
|
output_text = str(output) |
|
|
agent_outputs_section += f"Agent {i}: {output_text}\n" |
|
|
else: |
|
|
|
|
|
agent_outputs_section = f"\n\nAgent Analysis Results:\n{str(agent_outputs)}\n" |
|
|
|
|
|
|
|
|
prompt = f"""{expert_preamble} |
|
|
|
|
|
User Question: {user_input} |
|
|
|
|
|
{context_section}{agent_outputs_section} |
|
|
|
|
|
Instructions: Provide a comprehensive, helpful response that directly addresses the question from your expert perspective. If there's conversation context, use it to answer the current question appropriately. Be detailed, informative, and leverage your specialized expertise in {assigned_consultant.get('primary_expertise', 'general consulting')}. |
|
|
|
|
|
Response:""" |
|
|
|
|
|
return prompt |
|
|
|
|
|
def _build_context_section(self, context: Dict[str, Any]) -> str: |
|
|
"""Build context section with summarization for long conversations |
|
|
|
|
|
Uses Context Manager structure: |
|
|
- combined_context: Pre-formatted context string (preferred) |
|
|
- interaction_contexts: List of interaction summaries with 'summary' and 'timestamp' |
|
|
- user_context: User persona summary string |
|
|
""" |
|
|
if not context: |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
combined_context = context.get('combined_context', '') |
|
|
if combined_context: |
|
|
|
|
|
|
|
|
return f"\n\nConversation Context:\n{combined_context}" |
|
|
|
|
|
|
|
|
|
|
|
session_context = context.get('session_context', {}) |
|
|
session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
|
|
interaction_contexts = context.get('interaction_contexts', []) |
|
|
user_context = context.get('user_context', '') |
|
|
|
|
|
context_section = "" |
|
|
|
|
|
|
|
|
if session_summary: |
|
|
context_section += f"\n\nSession Context (Session Summary):\n{session_summary[:500]}...\n" |
|
|
|
|
|
if user_context: |
|
|
context_section += f"\n\nUser Context (Persona Summary):\n{user_context[:500]}...\n" |
|
|
|
|
|
|
|
|
if interaction_contexts: |
|
|
if len(interaction_contexts) <= 8: |
|
|
|
|
|
context_section += "\n\nPrevious Conversation Summary:\n" |
|
|
for i, ic in enumerate(interaction_contexts, 1): |
|
|
summary = ic.get('summary', '') |
|
|
if summary: |
|
|
context_section += f" {i}. {summary}\n" |
|
|
else: |
|
|
|
|
|
recent_contexts = interaction_contexts[-8:] |
|
|
older_contexts = interaction_contexts[:-8] |
|
|
|
|
|
|
|
|
summary = self._summarize_interaction_contexts(older_contexts) |
|
|
|
|
|
context_section += f"\n\nConversation Summary (earlier context):\n{summary}\n\nRecent Conversation:\n" |
|
|
|
|
|
for i, ic in enumerate(recent_contexts, 1): |
|
|
summary_text = ic.get('summary', '') |
|
|
if summary_text: |
|
|
context_section += f" {i}. {summary_text}\n" |
|
|
|
|
|
return context_section |
|
|
|
|
|
def _summarize_interaction_contexts(self, interaction_contexts: List[Dict[str, Any]]) -> str: |
|
|
"""Summarize older interaction contexts to preserve key context |
|
|
|
|
|
Uses Context Manager structure where interaction_contexts contains: |
|
|
- summary: 50-token interaction summary string |
|
|
- timestamp: Interaction timestamp |
|
|
""" |
|
|
if not interaction_contexts: |
|
|
return "No prior context." |
|
|
|
|
|
|
|
|
topics = [] |
|
|
key_points = [] |
|
|
|
|
|
for ic in interaction_contexts: |
|
|
summary = ic.get('summary', '') |
|
|
|
|
|
if summary: |
|
|
|
|
|
|
|
|
words = summary.lower().split() |
|
|
key_terms = [word for word in words if len(word) > 4][:3] |
|
|
topics.extend(key_terms) |
|
|
|
|
|
|
|
|
key_points.append(summary[:150]) |
|
|
|
|
|
|
|
|
unique_topics = list(set(topics))[:5] |
|
|
recent_points = key_points[-5:] |
|
|
|
|
|
summary_text = f"Topics discussed: {', '.join(unique_topics) if unique_topics else 'General discussion'}\n" |
|
|
summary_text += f"Key points: {' | '.join(recent_points) if recent_points else 'No specific points'}" |
|
|
|
|
|
return summary_text |
|
|
|
|
|
def _summarize_interactions(self, interactions: List[Dict[str, Any]]) -> str: |
|
|
"""Legacy method for backward compatibility - delegates to _summarize_interaction_contexts""" |
|
|
|
|
|
if interactions and 'summary' in interactions[0]: |
|
|
|
|
|
return self._summarize_interaction_contexts(interactions) |
|
|
else: |
|
|
|
|
|
interaction_contexts = [] |
|
|
for interaction in interactions: |
|
|
user_input = interaction.get('user_input', '') |
|
|
assistant_response = interaction.get('assistant_response') or interaction.get('response', '') |
|
|
|
|
|
summary = f"User asked: {user_input[:100]}..." if user_input else "" |
|
|
if summary: |
|
|
interaction_contexts.append({'summary': summary}) |
|
|
return self._summarize_interaction_contexts(interaction_contexts) |
|
|
|
|
|
def _extract_intent_info(self, agent_outputs: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Extract intent information from agent outputs""" |
|
|
for output in agent_outputs: |
|
|
if 'primary_intent' in output: |
|
|
return { |
|
|
'primary_intent': output['primary_intent'], |
|
|
'confidence': output.get('confidence_scores', {}).get(output['primary_intent'], 0.5), |
|
|
'source_agent': output.get('agent_id', 'unknown') |
|
|
} |
|
|
return None |
|
|
|
|
|
def _extract_source_references(self, agent_outputs: List[Dict[str, Any]]) -> List[str]: |
|
|
"""Extract source references from agent outputs""" |
|
|
sources = [] |
|
|
for output in agent_outputs: |
|
|
agent_id = output.get('agent_id', 'unknown') |
|
|
sources.append(agent_id) |
|
|
return list(set(sources)) |
|
|
|
|
|
def _calculate_quality_metrics(self, synthesis_result: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Calculate quality metrics for synthesis""" |
|
|
response = synthesis_result.get('final_response', '') |
|
|
|
|
|
return { |
|
|
"length": len(response), |
|
|
"word_count": len(response.split()) if response else 0, |
|
|
"coherence_score": synthesis_result.get('coherence_score', 0.7), |
|
|
"source_count": len(synthesis_result.get('source_references', [])), |
|
|
"has_structured_elements": bool(re.search(r'[•\d+\.]', response)) if response else False |
|
|
} |
|
|
|
|
|
def _check_intent_alignment(self, synthesis_result: Dict[str, Any], intent_info: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Check if synthesis aligns with detected intent""" |
|
|
|
|
|
intent_confidence = intent_info.get('confidence', 0.5) |
|
|
coherence_score = synthesis_result.get('coherence_score', 0.7) |
|
|
|
|
|
alignment_score = (intent_confidence + coherence_score) / 2.0 |
|
|
|
|
|
return { |
|
|
"intent_detected": intent_info.get('primary_intent'), |
|
|
"alignment_score": alignment_score, |
|
|
"alignment_verified": alignment_score > 0.7 |
|
|
} |
|
|
|
|
|
def _identify_improvements(self, response: str) -> List[str]: |
|
|
"""Identify opportunities to improve the response""" |
|
|
improvements = [] |
|
|
|
|
|
if len(response) < 50: |
|
|
improvements.append("Could be more detailed") |
|
|
|
|
|
if "?" not in response and len(response.split()) < 100: |
|
|
improvements.append("Consider adding examples") |
|
|
|
|
|
return improvements |
|
|
|
|
|
def _get_fallback_response(self, user_input: str, agent_outputs: List[Dict[str, Any]], |
|
|
assigned_consultant: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Provide fallback response when synthesis fails (LLM API failure only)""" |
|
|
|
|
|
if user_input: |
|
|
fallback_text = f"Thank you for your question: '{user_input}'. I'm processing your request and will provide a detailed response shortly." |
|
|
else: |
|
|
fallback_text = "I apologize, but I encountered an issue processing your request. Please try again." |
|
|
|
|
|
return { |
|
|
"synthesized_response": fallback_text, |
|
|
"draft_response": fallback_text, |
|
|
"final_response": fallback_text, |
|
|
"assigned_consultant": assigned_consultant, |
|
|
"source_references": self._extract_source_references(agent_outputs), |
|
|
"coherence_score": 0.5, |
|
|
"improvement_opportunities": ["LLM API error - fallback activated"], |
|
|
"synthesis_method": "expert_enhanced_fallback", |
|
|
"agent_id": self.agent_id, |
|
|
"synthesis_quality_metrics": self._calculate_quality_metrics({"final_response": fallback_text}), |
|
|
"error": True, |
|
|
"synthesis_metadata": {"expert_enhanced": True, "error": True, "llm_api_failed": True} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
ResponseSynthesisAgent = EnhancedSynthesisAgent |
|
|
|
|
|
|
|
|
|
|
|
def create_synthesis_agent(llm_router) -> EnhancedSynthesisAgent: |
|
|
"""Factory function to create enhanced synthesis agent""" |
|
|
return EnhancedSynthesisAgent(llm_router) |
|
|
|