Spaces:
Sleeping
Sleeping
| """ | |
| Advanced multi-step customer support email workflow environment. | |
| OpenEnv-compliant environment with 5-step agentic workflow. | |
| """ | |
| import uuid | |
| from typing import Dict, Any, Tuple, Optional | |
| import sys | |
| import os | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from models import ( | |
| EmailObservation, EmailAction, EmailState, StepReturn, ResetReturn, | |
| ActionType, WorkflowStep, RewardWeights, ToolType, ToolAction, ToolResult | |
| ) | |
| from .grader import ( | |
| calculate_step_reward, grade_workflow_completion, | |
| analyze_customer_sentiment, extract_urgency_indicators, | |
| check_escalation_requirement, refund_grader, tech_grader, escalation_grader | |
| ) | |
| # Mandatory Task definitions for OpenEnv validation | |
| TASKS = [ | |
| {"task_id": "easy_refund", "email_id": "email_001", "difficulty": "easy"}, | |
| {"task_id": "medium_tech", "email_id": "email_002", "difficulty": "medium"}, | |
| {"task_id": "hard_escalation", "email_id": "email_003", "difficulty": "hard"}, | |
| ] | |
| def search_knowledge_base(query: str): | |
| if "refund" in query.lower(): | |
| return { | |
| "policy_id": "POLICY_REFUND_001", | |
| "content": "Refunds allowed within 30 days..." | |
| } | |
| elif "technical" in query.lower(): | |
| return { | |
| "policy_id": "POLICY_TECH_002", | |
| "content": "Restart app..." | |
| } | |
| return {"policy_id": None, "content": ""} | |
| class CustomerSupportEnv: | |
| """ | |
| OpenEnv-compliant multi-step environment for customer support email workflow. | |
| 5-step episodes: classify → prioritize → decide_strategy → respond → escalate (optional) | |
| """ | |
| def __init__(self): | |
| """Initialize environment with expanded task queue""" | |
| self.task_queue = self._load_tasks() | |
| self.current_task = None | |
| self.current_state = None | |
| self.workflow_state = {} # Track decisions across steps | |
| self.episode_count = 0 | |
| def _load_tasks(self) -> list: | |
| """ | |
| Load expanded task queue with 10+ diverse scenarios. | |
| Includes: billing, tech, complaints, spam, VIP customers, repeat issues, | |
| mixed-intent emails, ambiguous cases, emotional customers, enterprise accounts | |
| """ | |
| return [ | |
| { | |
| "id": "email_001", | |
| "difficulty": "easy", | |
| "subject": "Refund request - duplicate charge", | |
| "body": ( | |
| "Hello,\n\n" | |
| "I was charged twice for my subscription this month. " | |
| "The charge of $49.99 appeared twice in my account on March 15. " | |
| "Please refund the duplicate charge immediately.\n\n" | |
| "Thanks,\nJohn" | |
| ), | |
| "customer_history": "Premium subscriber for 2 years, excellent payment history, first complaint", | |
| "label": { | |
| "category": "billing", | |
| "priority": "high" | |
| } | |
| }, | |
| { | |
| "id": "email_002", | |
| "difficulty": "medium", | |
| "subject": "App performance issue", | |
| "body": ( | |
| "Hi Support Team,\n\n" | |
| "I've been experiencing some issues with the app lately. " | |
| "It seems to crash when I try to open the settings menu. " | |
| "This happens on both my phone and tablet. " | |
| "I'm running the latest version. " | |
| "Could you help me investigate this?\n\n" | |
| "Sarah" | |
| ), | |
| "customer_history": "Casual user, 3 months active, 2 previous tech support tickets (both resolved)", | |
| "label": { | |
| "category": "tech", | |
| "priority": "medium" | |
| } | |
| }, | |
| { | |
| "id": "email_003", | |
| "difficulty": "hard", | |
| "subject": "Completely disappointed with your service", | |
| "body": ( | |
| "This is absolutely frustrating. " | |
| "I submitted a support ticket 5 DAYS ago about my account being locked, " | |
| "and I haven't heard a single word from anyone. " | |
| "Your customer service is non-existent. " | |
| "I've recommended your product to friends, but I regret that now. " | |
| "If this isn't resolved TODAY, I'm leaving a bad review everywhere. " | |
| "I expect compensation for the inconvenience and lost time.\n\n" | |
| "Regards,\nMichael" | |
| ), | |
| "customer_history": "Enterprise customer, $500/month contract, previously submitted 7 complaints in past 3 months, escalated to management twice", | |
| "label": { | |
| "category": "complaint", | |
| "priority": "high" | |
| } | |
| }, | |
| { | |
| "id": "email_004", | |
| "difficulty": "easy", | |
| "subject": "Unsubscribe request", | |
| "body": ( | |
| "Please remove me from your mailing list. " | |
| "I no longer wish to receive your emails.\n\n" | |
| "Best,\nAnonymous" | |
| ), | |
| "customer_history": "Free tier user, signed up 6 months ago, no previous interactions", | |
| "label": { | |
| "category": "spam", | |
| "priority": "low" | |
| } | |
| }, | |
| { | |
| "id": "email_005", | |
| "difficulty": "hard", | |
| "subject": "URGENT: Account suspension affecting business operations", | |
| "body": ( | |
| "This is critical. Our company account was suspended this morning without warning. " | |
| "We have 50 employees who cannot access their work tools. " | |
| "This is causing significant business disruption. " | |
| "We need immediate resolution and compensation for lost productivity. " | |
| "Please escalate to your highest level of management.\n\n" | |
| "CEO, TechCorp Solutions" | |
| ), | |
| "customer_history": "Enterprise VIP customer, $2000/month contract, perfect payment history, first incident", | |
| "label": { | |
| "category": "complaint", | |
| "priority": "high" | |
| } | |
| }, | |
| { | |
| "id": "email_006", | |
| "difficulty": "medium", | |
| "subject": "Login issues after password reset", | |
| "body": ( | |
| "Hi,\n\n" | |
| "I reset my password yesterday but now I can't log in. " | |
| "The system says my password is incorrect, but I'm sure I'm typing it right. " | |
| "I tried resetting again but got the same result. " | |
| "Can you help me regain access to my account?\n\n" | |
| "Thanks,\nLisa" | |
| ), | |
| "customer_history": "Regular user, 1 year active, had similar login issue 3 months ago (resolved by phone support)", | |
| "label": { | |
| "category": "tech", | |
| "priority": "medium" | |
| } | |
| }, | |
| { | |
| "id": "email_007", | |
| "difficulty": "hard", | |
| "subject": "Mixed feedback - billing and feature request", | |
| "body": ( | |
| "Hello Support,\n\n" | |
| "I love your product overall, but I'm frustrated with the billing. " | |
| "The charges are confusing and I think I'm being overcharged. " | |
| "Also, could you add a feature to export data in CSV format? " | |
| "That would be really helpful for my workflow. " | |
| "Please look into both issues.\n\n" | |
| "Best,\nDavid" | |
| ), | |
| "customer_history": "Power user, 18 months active, multiple feature requests submitted, occasional billing questions", | |
| "label": { | |
| "category": "billing", # Primary issue is billing | |
| "priority": "medium" | |
| } | |
| }, | |
| { | |
| "id": "email_008", | |
| "difficulty": "easy", | |
| "subject": "Thank you for the quick resolution", | |
| "body": ( | |
| "Hi Team,\n\n" | |
| "Just wanted to say thank you for fixing the sync issue so quickly yesterday. " | |
| "Everything is working perfectly now. " | |
| "Great customer service!\n\n" | |
| "Regards,\nMaria" | |
| ), | |
| "customer_history": "Loyal customer, 3 years active, submitted 2 support tickets (both resolved quickly)", | |
| "label": { | |
| "category": "complaint", # Actually positive feedback | |
| "priority": "low" | |
| } | |
| }, | |
| { | |
| "id": "email_009", | |
| "difficulty": "hard", | |
| "subject": "Account hacked - immediate action required", | |
| "body": ( | |
| "OH MY GOD MY ACCOUNT HAS BEEN HACKED! " | |
| "Someone changed my password and email address. " | |
| "I can't get back in and I'm terrified they're going to steal my data. " | |
| "This is a nightmare. Please help me immediately! " | |
| "I need you to restore access and secure my account. " | |
| "This is unacceptable!\n\n" | |
| "Panicking,\nAlex" | |
| ), | |
| "customer_history": "Premium subscriber, 6 months active, no previous security issues, high-value account", | |
| "label": { | |
| "category": "tech", | |
| "priority": "high" | |
| } | |
| }, | |
| { | |
| "id": "email_010", | |
| "difficulty": "medium", | |
| "subject": "Question about upcoming features", | |
| "body": ( | |
| "Hello,\n\n" | |
| "I saw in your newsletter that you're working on mobile app improvements. " | |
| "Can you tell me when those will be available? " | |
| "Also, will there be any changes to the pricing structure?\n\n" | |
| "Thanks,\nRobert" | |
| ), | |
| "customer_history": "Enterprise customer, $750/month contract, active user, interested in product roadmap", | |
| "label": { | |
| "category": "spam", # Not really support, more inquiry | |
| "priority": "low" | |
| } | |
| }, | |
| { | |
| "id": "email_011", | |
| "difficulty": "hard", | |
| "subject": "Recurring billing issue - multiple failed attempts", | |
| "body": ( | |
| "This is the third time this month that my payment has failed. " | |
| "I've updated my card information twice already, but it keeps failing. " | |
| "I'm getting frustrated with this recurring problem. " | |
| "Please investigate why my payments aren't processing and fix this permanently. " | |
| "I don't want to have to deal with this every month.\n\n" | |
| "Sincerely,\nJennifer" | |
| ), | |
| "customer_history": "Long-time customer, 4 years active, multiple billing issues in past year, escalated once, high-value account", | |
| "label": { | |
| "category": "billing", | |
| "priority": "high" | |
| } | |
| }, | |
| { | |
| "id": "email_012", | |
| "difficulty": "medium", | |
| "subject": "Feature suggestion and minor bug report", | |
| "body": ( | |
| "Hi Support,\n\n" | |
| "Love the new dashboard design! One small issue though - " | |
| "the export button doesn't work when I filter the results. " | |
| "Also, it would be great if you could add keyboard shortcuts for common actions. " | |
| "Keep up the good work!\n\n" | |
| "Cheers,\nTom" | |
| ), | |
| "customer_history": "Developer account, beta tester, frequent feature suggestions, minor bug reports", | |
| "label": { | |
| "category": "tech", | |
| "priority": "low" | |
| } | |
| } | |
| ] | |
| def _prepare_task_data(self, task: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Prepare task data with additional analysis for multi-step workflow. | |
| Args: | |
| task: Raw task data | |
| Returns: | |
| Enhanced task data with sentiment and urgency analysis | |
| """ | |
| enhanced_task = task.copy() | |
| # Analyze sentiment | |
| sentiment = analyze_customer_sentiment(task["body"], task["subject"]) | |
| enhanced_task["sentiment"] = sentiment | |
| # Extract urgency indicators | |
| urgency_indicators = extract_urgency_indicators(task["body"], task["subject"]) | |
| enhanced_task["urgency_indicators"] = urgency_indicators | |
| return enhanced_task | |
| def get_current_workflow_step(self) -> WorkflowStep: | |
| """Centralized logic to determine the current workflow step based on state.""" | |
| if self.workflow_state["classification"] is None: | |
| return WorkflowStep.CLASSIFICATION | |
| if self.workflow_state["priority"] is None: | |
| return WorkflowStep.PRIORITIZATION | |
| if self.workflow_state["strategy"] is None: | |
| return WorkflowStep.STRATEGY_DECISION | |
| if self.workflow_state["response"] is None: | |
| return WorkflowStep.RESPONSE_GENERATION | |
| if self.workflow_state["escalation"] is None: | |
| return WorkflowStep.ESCALATION_DECISION | |
| return WorkflowStep.COMPLETED | |
| def reset(self) -> Dict[str, Any]: | |
| """ | |
| Reset environment and start new multi-step episode. | |
| Returns: | |
| Dict with 'observation' and 'info' keys | |
| """ | |
| if not self.task_queue: | |
| self.task_queue = self._load_tasks() | |
| # Phase 2 Fix: Wrap selection in structured TASKS context | |
| # We cycle through the 3 mandatory tasks for consistent evaluation | |
| task_idx = self.episode_count % len(TASKS) | |
| selected_task_metadata = TASKS[task_idx] | |
| task_id = selected_task_metadata["task_id"] | |
| # Find corresponding email data in current queue or database | |
| # For simplicity in this fix, we search task_queue for matching email_id | |
| email_data = next((t for t in self.task_queue if t["id"] == selected_task_metadata["email_id"]), self.task_queue[0]) | |
| self.current_task = self._prepare_task_data(email_data) | |
| self.current_task["task_id"] = task_id # Attach mandatory task_id | |
| self.episode_count += 1 | |
| # Initialize workflow state | |
| self.workflow_state = { | |
| "classification": None, | |
| "priority": None, | |
| "strategy": None, | |
| "response": None, | |
| "escalation": None, | |
| "tools_used": False, | |
| "tools_used_count": 0 | |
| } | |
| self.current_state = EmailState( | |
| episode_id=f"episode_{self.episode_count}_{uuid.uuid4().hex[:8]}", | |
| step_count=0, | |
| done=False, | |
| current_email=self.current_task["id"], | |
| total_reward=0.0 | |
| ) | |
| observation = EmailObservation( | |
| email_id=self.current_task["id"], | |
| subject=self.current_task["subject"], | |
| body=self.current_task["body"], | |
| customer_history=self.current_task["customer_history"], | |
| step_count=0, | |
| workflow_step=WorkflowStep.CLASSIFICATION, | |
| available_actions=["classify", "use_tool"], | |
| available_tools=[tool.value for tool in ToolType], | |
| previous_decisions=self.workflow_state.copy(), | |
| customer_sentiment=self.current_task["sentiment"], | |
| urgency_indicators=self.current_task["urgency_indicators"] | |
| ) | |
| return { | |
| "observation": observation, | |
| "info": { | |
| "episode_id": self.current_state.episode_id, | |
| "difficulty": self.current_task.get("difficulty", "unknown"), | |
| "email_id": self.current_task["id"], | |
| "task_id": self.current_task.get("task_id"), | |
| "workflow_step": 0, | |
| "max_steps": 5 | |
| } | |
| } | |
| def step(self, action: EmailAction) -> Dict[str, Any]: | |
| """ | |
| Process agent action in multi-step workflow. | |
| Now supports tool usage actions. | |
| Args: | |
| action: Agent's action (EmailAction with action_type and content) | |
| Returns: | |
| Dict with observation, reward, done, info | |
| """ | |
| if self.current_task is None: | |
| raise RuntimeError("Environment not reset. Call reset() first.") | |
| current_step = self.current_state.step_count | |
| # Handle tool usage (special action type) | |
| if hasattr(action, 'tool_action') and action.tool_action: | |
| tool_result = self.execute_tool(action.tool_action) | |
| # Tool usage gives small reward/penalty but doesn't advance workflow | |
| if self.workflow_state.get("tools_used_count", 0) >= 1: | |
| tool_reward = 0.0 | |
| else: | |
| tool_reward = 0.05 if tool_result.success else -0.02 | |
| tool_reward = min(tool_reward, 0.02) | |
| self.workflow_state["tools_used"] = True | |
| self.workflow_state["tools_used_count"] = self.workflow_state.get("tools_used_count", 0) + 1 | |
| # Use centralized method for step determination | |
| current_workflow_step = self.get_current_workflow_step() | |
| current_available_actions = ( | |
| ["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else | |
| ["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else | |
| ["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else | |
| ["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else | |
| ["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else | |
| ["use_tool"] | |
| ) | |
| observation = EmailObservation( | |
| email_id=self.current_task["id"], | |
| subject=self.current_task["subject"], | |
| body=self.current_task["body"], | |
| customer_history=self.current_task["customer_history"], | |
| step_count=self.current_state.step_count, | |
| workflow_step=current_workflow_step, | |
| available_actions=current_available_actions, | |
| available_tools=[tool.value for tool in ToolType], | |
| previous_decisions=self.workflow_state.copy(), | |
| customer_sentiment=self.current_task["sentiment"], | |
| urgency_indicators=self.current_task["urgency_indicators"], | |
| tool_result=tool_result | |
| ) | |
| return { | |
| "observation": observation, | |
| "reward": tool_reward, | |
| "done": False, | |
| "info": { | |
| "tool_used": tool_result.tool_type.value, | |
| "tool_success": tool_result.success, | |
| "tool_data": tool_result.data | |
| } | |
| } | |
| # Normal workflow step processing... | |
| # Calculate step reward | |
| step_reward, reward_breakdown = calculate_step_reward( | |
| current_step, action, self.current_task, self.workflow_state | |
| ) | |
| # Update workflow state based on action | |
| if action.action_type == ActionType.CLASSIFY: | |
| self.workflow_state["classification"] = action.content | |
| elif action.action_type == ActionType.PRIORITIZE: | |
| self.workflow_state["priority"] = action.content | |
| elif action.action_type == ActionType.DECIDE_STRATEGY: | |
| self.workflow_state["strategy"] = action.content | |
| elif action.action_type == ActionType.RESPOND: | |
| self.workflow_state["response"] = action.content | |
| elif action.action_type == ActionType.ESCALATE: | |
| self.workflow_state["escalation"] = action.content | |
| # Update state | |
| self.current_state.step_count += 1 | |
| self.current_state.total_reward += step_reward | |
| # Check if episode is complete | |
| done = self._is_episode_complete() | |
| # Create observation with updated workflow context | |
| # Determine next step and available actions based on STATE, not step_count | |
| # Use centralized method for step determination | |
| current_workflow_step = self.get_current_workflow_step() | |
| current_available_actions = ( | |
| ["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else | |
| ["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else | |
| ["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else | |
| ["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else | |
| ["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else | |
| ["use_tool"] | |
| ) | |
| # Create observation with updated workflow context | |
| observation = EmailObservation( | |
| email_id=self.current_task["id"], | |
| subject=self.current_task["subject"], | |
| body=self.current_task["body"], | |
| customer_history=self.current_task["customer_history"], | |
| step_count=self.current_state.step_count, | |
| workflow_step=current_workflow_step, | |
| available_actions=current_available_actions, | |
| available_tools=[tool.value for tool in ToolType], | |
| previous_decisions=self.workflow_state.copy(), | |
| customer_sentiment=self.current_task["sentiment"], | |
| urgency_indicators=self.current_task["urgency_indicators"] | |
| ) | |
| # Add completion bonus if episode is done | |
| if done: | |
| completion_bonus, completion_breakdown = grade_workflow_completion(self.workflow_state) | |
| # Add escalation requirement check | |
| escalation_penalty, escalation_bonus = check_escalation_requirement(self.current_task, self.workflow_state) | |
| completion_bonus += escalation_bonus - escalation_penalty | |
| self.current_state.total_reward += completion_bonus | |
| reward_breakdown["completion_bonus"] = completion_bonus | |
| reward_breakdown["escalation_penalty"] = escalation_penalty | |
| reward_breakdown["escalation_bonus"] = escalation_bonus | |
| reward_breakdown.update(completion_breakdown) | |
| info = { | |
| "workflow_state": self.workflow_state.copy(), | |
| "total_reward": self.current_state.total_reward, | |
| "reward_breakdown": reward_breakdown, | |
| "step_count": self.current_state.step_count, | |
| "episode_complete": done, | |
| "task_id": self.current_task.get("task_id") | |
| } | |
| # PHASE 2 REQUIREMENT: Return score only at end | |
| if done: | |
| info["score"] = self.compute_score() | |
| return { | |
| "observation": observation, | |
| "reward": step_reward, | |
| "done": done, | |
| "info": info | |
| } | |
| def compute_score(self) -> float: | |
| """ Programmatic score computation for OpenEnv validation. """ | |
| task_id = self.current_task.get("task_id") | |
| if task_id == "easy_refund": | |
| return refund_grader(self.workflow_state) | |
| elif task_id == "medium_tech": | |
| return tech_grader(self.workflow_state) | |
| elif task_id == "hard_escalation": | |
| return escalation_grader(self.workflow_state) | |
| return 0.0 | |
| def _is_episode_complete(self) -> bool: | |
| """ | |
| Check if the current episode is complete. | |
| Episode completes when: | |
| - All required steps (classify, prioritize, strategy, respond) are done, OR | |
| - Escalation step is taken (optional final step) | |
| Returns: | |
| True if episode should end | |
| """ | |
| required_steps = ["classification", "priority", "strategy", "response"] | |
| completed_required = all(self.workflow_state.get(step) is not None for step in required_steps) | |
| # Episode can end after required steps, or after escalation | |
| return completed_required or (self.workflow_state.get("escalation") is not None) | |
| def get_state(self) -> Dict[str, Any]: | |
| """ | |
| Get current environment state. | |
| Returns: | |
| Current state as dict | |
| """ | |
| if self.current_state is None: | |
| return {"error": "Environment not initialized. Call reset() first."} | |
| return { | |
| "episode_id": self.current_state.episode_id, | |
| "step_count": self.current_state.step_count, | |
| "done": self.current_state.done, | |
| "current_email": self.current_state.current_email, | |
| "total_reward": self.current_state.total_reward, | |
| "workflow_state": self.workflow_state.copy() | |
| } | |
| def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get environment statistics. | |
| Returns: | |
| Stats dict | |
| """ | |
| return { | |
| "episode_count": self.episode_count, | |
| "remaining_tasks": len(self.task_queue), | |
| "current_task_id": self.current_task["id"] if self.current_task else None, | |
| "current_workflow_step": self.current_state.step_count if self.current_state else 0 | |
| } | |
| def execute_tool(self, tool_action: ToolAction) -> ToolResult: | |
| """ | |
| Execute a tool action and return results. | |
| Args: | |
| tool_action: The tool action to execute | |
| Returns: | |
| ToolResult with execution outcome | |
| """ | |
| if self.current_task is None: | |
| return ToolResult( | |
| tool_type=tool_action.tool_type, | |
| success=False, | |
| error="No active task" | |
| ) | |
| try: | |
| if tool_action.tool_type == ToolType.LOOKUP_CUSTOMER: | |
| return self._lookup_customer(tool_action.parameters) | |
| elif tool_action.tool_type == ToolType.SEARCH_HISTORY: | |
| return self._search_history(tool_action.parameters) | |
| elif tool_action.tool_type == ToolType.CHECK_POLICY: | |
| return self._check_policy(tool_action.parameters) | |
| else: | |
| return ToolResult( | |
| tool_type=tool_action.tool_type, | |
| success=False, | |
| error=f"Unknown tool: {tool_action.tool_type}" | |
| ) | |
| except Exception as e: | |
| return ToolResult( | |
| tool_type=tool_action.tool_type, | |
| success=False, | |
| error=str(e) | |
| ) | |
| def _lookup_customer(self, params: Dict[str, Any]) -> ToolResult: | |
| """Look up detailed customer information""" | |
| customer_id = params.get("customer_id", "").strip() | |
| # Simulate customer database lookup | |
| mock_customer_db = { | |
| "email_001": { | |
| "customer_id": "CUST_001", | |
| "account_type": "premium", | |
| "total_value": 2499.99, | |
| "join_date": "2022-03-15", | |
| "complaints": 1, | |
| "satisfaction_score": 4.8 | |
| }, | |
| "email_005": { | |
| "customer_id": "CUST_005", | |
| "account_type": "enterprise", | |
| "total_value": 15000.00, | |
| "join_date": "2021-01-10", | |
| "complaints": 3, | |
| "satisfaction_score": 3.2 | |
| }, | |
| "email_011": { | |
| "customer_id": "CUST_011", | |
| "account_type": "standard", | |
| "total_value": 149.99, | |
| "join_date": "2023-08-22", | |
| "complaints": 4, | |
| "satisfaction_score": 2.1 | |
| } | |
| } | |
| if customer_id in mock_customer_db: | |
| return ToolResult( | |
| tool_type=ToolType.LOOKUP_CUSTOMER, | |
| success=True, | |
| data=mock_customer_db[customer_id] | |
| ) | |
| else: | |
| return ToolResult( | |
| tool_type=ToolType.LOOKUP_CUSTOMER, | |
| success=False, | |
| error="Customer not found" | |
| ) | |
| def _search_history(self, params: Dict[str, Any]) -> ToolResult: | |
| """Search customer interaction history""" | |
| query = params.get("query", "").lower().strip() | |
| limit = params.get("limit", 5) | |
| # Simulate history search | |
| mock_history = { | |
| "email_002": [ | |
| {"date": "2024-01-15", "type": "tech_support", "summary": "App crash issue - resolved"}, | |
| {"date": "2024-02-20", "type": "feature_request", "summary": "Requested export functionality"} | |
| ], | |
| "email_003": [ | |
| {"date": "2024-01-10", "type": "complaint", "summary": "Account lock issue - escalated"}, | |
| {"date": "2024-02-05", "type": "complaint", "summary": "Response delay - escalated"}, | |
| {"date": "2024-03-01", "type": "complaint", "summary": "Service dissatisfaction - escalated"} | |
| ], | |
| "email_006": [ | |
| {"date": "2024-03-01", "type": "tech_support", "summary": "Login issue - resolved by phone"} | |
| ] | |
| } | |
| current_email = self.current_task.get("id", "") | |
| if current_email in mock_history: | |
| history = mock_history[current_email] | |
| # Filter by query if provided | |
| if query: | |
| history = [h for h in history if query in h["summary"].lower()] | |
| return ToolResult( | |
| tool_type=ToolType.SEARCH_HISTORY, | |
| success=True, | |
| data={"history": history[:limit], "total_found": len(history)} | |
| ) | |
| else: | |
| return ToolResult( | |
| tool_type=ToolType.SEARCH_HISTORY, | |
| success=True, | |
| data={"history": [], "total_found": 0} | |
| ) | |
| def _check_policy(self, params: Dict[str, Any]) -> ToolResult: | |
| """Check company policies for handling situations""" | |
| policy_type = params.get("policy_type", "").lower().strip() | |
| # Simulate policy database | |
| mock_policies = { | |
| "refund": { | |
| "description": "Refunds available within 30 days for billing errors", | |
| "conditions": ["duplicate_charge", "service_unavailable", "incorrect_billing"], | |
| "approval_required": False, | |
| "max_amount": 500.00 | |
| }, | |
| "escalation": { | |
| "description": "Escalate to management for VIP customers or severe complaints", | |
| "conditions": ["vip_customer", "enterprise_account", "angry_customer", "multiple_complaints"], | |
| "approval_required": True, | |
| "escalation_levels": ["supervisor", "manager", "executive"] | |
| }, | |
| "data_privacy": { | |
| "description": "Never share customer data without explicit consent", | |
| "conditions": ["gdpr_compliant", "ccpa_compliant"], | |
| "approval_required": True | |
| } | |
| } | |
| if policy_type in mock_policies: | |
| return ToolResult( | |
| tool_type=ToolType.CHECK_POLICY, | |
| success=True, | |
| data=mock_policies[policy_type] | |
| ) | |
| else: | |
| return ToolResult( | |
| tool_type=ToolType.CHECK_POLICY, | |
| success=False, | |
| error=f"Policy '{policy_type}' not found" | |
| ) | |