| """
|
| Context Engine for Intelligent Response Generation.
|
|
|
| Provides deep understanding of conversation context:
|
| - Multi-turn context tracking
|
| - Scam narrative understanding
|
| - Information gap detection
|
| - Strategic question generation
|
| - Response coherence validation
|
|
|
| This makes our responses contextually perfect, never generic.
|
| """
|
|
|
| import re
|
| from typing import Dict, List, Optional, Set, Tuple
|
| from dataclasses import dataclass, field
|
| from enum import Enum
|
|
|
| from app.utils.logger import get_logger
|
|
|
| logger = get_logger(__name__)
|
|
|
|
|
| class ScamNarrativeStage(Enum):
|
| """Stages of a typical scam narrative."""
|
| HOOK = "hook"
|
| BUILD_UP = "build_up"
|
| DEMAND = "demand"
|
| PRESSURE = "pressure"
|
| COLLECTION = "collection"
|
| CLOSING = "closing"
|
|
|
|
|
| class InformationType(Enum):
|
| """Types of information in conversation."""
|
| MONEY_AMOUNT = "money_amount"
|
| PAYMENT_DEADLINE = "payment_deadline"
|
| THREAT_TYPE = "threat_type"
|
| REWARD_TYPE = "reward_type"
|
| PAYMENT_METHOD = "payment_method"
|
| CONTACT_INFO = "contact_info"
|
| INSTRUCTIONS = "instructions"
|
| URGENCY_REASON = "urgency_reason"
|
| AUTHORITY_CLAIM = "authority_claim"
|
|
|
|
|
| @dataclass
|
| class ConversationContext:
|
| """Complete context of the conversation."""
|
|
|
| turn_count: int = 0
|
| language: str = "en"
|
|
|
|
|
| narrative_stage: ScamNarrativeStage = ScamNarrativeStage.HOOK
|
| claimed_amounts: List[str] = field(default_factory=list)
|
| claimed_deadlines: List[str] = field(default_factory=list)
|
| claimed_authorities: List[str] = field(default_factory=list)
|
| claimed_threats: List[str] = field(default_factory=list)
|
| claimed_rewards: List[str] = field(default_factory=list)
|
|
|
|
|
| requested_info: Set[str] = field(default_factory=set)
|
| requested_actions: List[str] = field(default_factory=list)
|
|
|
|
|
| agreed_actions: List[str] = field(default_factory=list)
|
|
|
|
|
| info_gaps: Set[str] = field(default_factory=set)
|
|
|
|
|
| mentioned_names: List[str] = field(default_factory=list)
|
| mentioned_companies: List[str] = field(default_factory=list)
|
| mentioned_locations: List[str] = field(default_factory=list)
|
|
|
|
|
| topics_discussed: List[str] = field(default_factory=list)
|
| pending_questions: List[str] = field(default_factory=list)
|
|
|
|
|
| contradictions: List[Tuple[str, str]] = field(default_factory=list)
|
| repeated_claims: Dict[str, int] = field(default_factory=dict)
|
|
|
|
|
| @dataclass
|
| class ResponseSuggestion:
|
| """A suggested response based on context."""
|
| response: str
|
| strategy: str
|
| targets_info: List[str]
|
| coherence_score: float
|
| priority: int
|
|
|
|
|
|
|
| CONTEXT_PATTERNS = {
|
| "money_amount": [
|
| r"(?:rs\.?|₹|rupees?)\s*(\d+(?:,\d{3})*(?:\.\d{2})?)",
|
| r"(\d+)\s*(?:lakh|lac|crore|cr|million)",
|
| r"amount\s*(?:of|is)?\s*(?:rs\.?|₹)?\s*(\d+)",
|
| ],
|
| "deadline": [
|
| r"(today|tonight|tomorrow|within\s+\d+\s+(?:hour|minute|day))",
|
| r"before\s+(midnight|\d+\s*(?:am|pm)|end\s+of\s+day)",
|
| r"deadline\s*(?:is)?\s*(.+?)(?:\.|,|$)",
|
| ],
|
| "authority": [
|
| r"(reserve\s+bank|rbi|police|court|cbi|ed|income\s+tax|government)",
|
| r"(customer\s+(?:care|support|service))",
|
| r"department\s+of\s+(\w+)",
|
| r"(telecom|trai|doi|cyber\s+cell)",
|
| ],
|
| "threat": [
|
| r"(arrest|jail|prison|court\s+case|legal\s+action)",
|
| r"(block|freeze|suspend|deactivate)\s+(?:your\s+)?(?:account|number)",
|
| r"(penalty|fine|charges?)\s+of",
|
| ],
|
| "reward": [
|
| r"(?:won|winner\s+of|prize\s+of)\s+(.+?)(?:\.|!|$)",
|
| r"(?:reward|bonus|cashback)\s+of\s+(.+?)(?:\.|!|$)",
|
| ],
|
| "payment_request": [
|
| r"(?:pay|send|transfer)\s+(?:rs\.?|₹)?\s*(\d+)",
|
| r"(?:processing|registration|verification)\s+fee",
|
| r"(?:send|share)\s+(?:your\s+)?otp",
|
| ],
|
| "contact_request": [
|
| r"call\s+(?:this\s+)?(?:number|on)\s*[:\-]?\s*(\+?[\d\s\-]+)",
|
| r"(?:whatsapp|contact)\s+(?:on|at)?\s*(\+?[\d\s\-]+)",
|
| ],
|
| }
|
|
|
|
|
| STRATEGIC_QUESTIONS = {
|
| "upi": [
|
| "Okay I'll send! What's your UPI ID?",
|
| "I want to pay now! Give me your UPI!",
|
| "Tell me your UPI, I'll transfer immediately!",
|
| "What's the UPI ID? I'm opening my app!",
|
| ],
|
| "phone": [
|
| "Can I call you? What's your number?",
|
| "Give me your phone number in case payment fails!",
|
| "Let me save your number for later!",
|
| "What's your WhatsApp number?",
|
| ],
|
| "bank_account": [
|
| "UPI not working! Give me bank account number!",
|
| "I'll do NEFT transfer. Account number and IFSC?",
|
| "Tell me account details, I'll transfer directly!",
|
| ],
|
| "name": [
|
| "What name should I put for the transfer?",
|
| "App is asking for beneficiary name. What is it?",
|
| "Whose name is the account in?",
|
| ],
|
| "verification": [
|
| "How do I know this is real?",
|
| "Can you send me official letter?",
|
| "What's your employee ID?",
|
| ],
|
| }
|
|
|
|
|
| STAGE_RESPONSES = {
|
| ScamNarrativeStage.HOOK: {
|
| "curious": [
|
| "What? Tell me more!",
|
| "Really? What's this about?",
|
| "Oh? Go on...",
|
| ],
|
| "excited": [
|
| "Wow! Is this real?!",
|
| "Amazing! Tell me everything!",
|
| ],
|
| },
|
| ScamNarrativeStage.BUILD_UP: {
|
| "engaged": [
|
| "Okay okay, I'm listening!",
|
| "Yes yes, continue!",
|
| "Then what happened?",
|
| ],
|
| "eager": [
|
| "I want this! What do I do?",
|
| "Tell me the process!",
|
| ],
|
| },
|
| ScamNarrativeStage.DEMAND: {
|
| "willing": [
|
| "Okay I'll do it! Just tell me how!",
|
| "Yes, I'm ready! What's next?",
|
| ],
|
| "extracting": [
|
| "I'll pay right now! Where should I send?",
|
| "Give me your UPI, I'll transfer immediately!",
|
| ],
|
| },
|
| ScamNarrativeStage.PRESSURE: {
|
| "compliant": [
|
| "Okay okay! I'm doing it! Just give me the details!",
|
| "Please don't cancel! I'm ready to pay!",
|
| ],
|
| "fearful": [
|
| "Please don't arrest me! I'll pay now!",
|
| "I'm scared! Tell me where to send money!",
|
| ],
|
| },
|
| ScamNarrativeStage.COLLECTION: {
|
| "giving": [
|
| "I'm sending now! What's the UPI?",
|
| "Payment is going! Also give me phone number for confirmation!",
|
| ],
|
| },
|
| }
|
|
|
|
|
| class ContextEngine:
|
| """
|
| Engine for deep context understanding and intelligent response generation.
|
|
|
| Tracks the entire conversation context to:
|
| - Understand where we are in the scam narrative
|
| - Identify what information we still need
|
| - Generate contextually appropriate responses
|
| - Ensure response coherence
|
| """
|
|
|
| def __init__(self):
|
| """Initialize the context engine."""
|
| self._compile_patterns()
|
| self.context = ConversationContext()
|
| logger.info("ContextEngine initialized")
|
|
|
| def _compile_patterns(self) -> None:
|
| """Pre-compile regex patterns."""
|
| self.compiled_patterns: Dict[str, List] = {}
|
|
|
| for pattern_type, patterns in CONTEXT_PATTERNS.items():
|
| self.compiled_patterns[pattern_type] = [
|
| re.compile(p, re.IGNORECASE) for p in patterns
|
| ]
|
|
|
| def analyze_message(
|
| self,
|
| message: str,
|
| sender: str,
|
| turn_count: int,
|
| language: str = "en",
|
| ) -> ConversationContext:
|
| """
|
| Analyze a message and update conversation context.
|
|
|
| Args:
|
| message: The message to analyze
|
| sender: Who sent it ('scammer' or 'agent')
|
| turn_count: Current turn number
|
| language: Message language
|
|
|
| Returns:
|
| Updated ConversationContext
|
| """
|
| self.context.turn_count = turn_count
|
| self.context.language = language
|
|
|
| if sender == "scammer":
|
| self._analyze_scammer_message(message)
|
| else:
|
| self._analyze_agent_message(message)
|
|
|
|
|
| self._update_narrative_stage(turn_count)
|
|
|
|
|
| self._identify_info_gaps()
|
|
|
| return self.context
|
|
|
| def _analyze_scammer_message(self, message: str) -> None:
|
| """Extract context from scammer message."""
|
| message_lower = message.lower()
|
|
|
|
|
| for pattern in self.compiled_patterns["money_amount"]:
|
| matches = pattern.findall(message_lower)
|
| for match in matches:
|
| if match and match not in self.context.claimed_amounts:
|
| self.context.claimed_amounts.append(match)
|
|
|
|
|
| for pattern in self.compiled_patterns["deadline"]:
|
| matches = pattern.findall(message_lower)
|
| for match in matches:
|
| if match and match not in self.context.claimed_deadlines:
|
| self.context.claimed_deadlines.append(match)
|
|
|
|
|
| for pattern in self.compiled_patterns["authority"]:
|
| matches = pattern.findall(message_lower)
|
| for match in matches:
|
| if match and match not in self.context.claimed_authorities:
|
| self.context.claimed_authorities.append(match)
|
|
|
|
|
| for pattern in self.compiled_patterns["threat"]:
|
| matches = pattern.findall(message_lower)
|
| for match in matches:
|
| if match and match not in self.context.claimed_threats:
|
| self.context.claimed_threats.append(match)
|
|
|
|
|
| for pattern in self.compiled_patterns["reward"]:
|
| matches = pattern.findall(message_lower)
|
| for match in matches:
|
| if match and match not in self.context.claimed_rewards:
|
| self.context.claimed_rewards.append(match)
|
|
|
|
|
| if any(w in message_lower for w in ["otp", "send otp", "share otp"]):
|
| self.context.requested_info.add("otp")
|
| if any(w in message_lower for w in ["pay", "send", "transfer", "fee"]):
|
| self.context.requested_info.add("payment")
|
| if any(w in message_lower for w in ["click", "link", "url"]):
|
| self.context.requested_info.add("click_link")
|
| if any(w in message_lower for w in ["call", "phone", "dial"]):
|
| self.context.requested_info.add("call")
|
|
|
|
|
| topic = self._identify_topic(message_lower)
|
| if topic and topic not in self.context.topics_discussed:
|
| self.context.topics_discussed.append(topic)
|
|
|
| def _analyze_agent_message(self, message: str) -> None:
|
| """Track what our agent has said."""
|
| message_lower = message.lower()
|
|
|
|
|
| if any(w in message_lower for w in ["i'll pay", "i will pay", "i'm paying", "sending"]):
|
| self.context.agreed_actions.append("pay")
|
| if any(w in message_lower for w in ["i'll send", "i will send"]):
|
| self.context.agreed_actions.append("send")
|
| if any(w in message_lower for w in ["i'll call", "let me call"]):
|
| self.context.agreed_actions.append("call")
|
|
|
| def _identify_topic(self, message: str) -> Optional[str]:
|
| """Identify the main topic of the message."""
|
| if any(w in message for w in ["lottery", "prize", "won", "winner"]):
|
| return "lottery"
|
| if any(w in message for w in ["kyc", "verify", "verification", "aadhar", "pan"]):
|
| return "kyc"
|
| if any(w in message for w in ["police", "arrest", "court", "cbi", "ed"]):
|
| return "authority_threat"
|
| if any(w in message for w in ["account", "block", "suspend", "freeze"]):
|
| return "account_threat"
|
| if any(w in message for w in ["job", "work", "salary", "earning"]):
|
| return "job_offer"
|
| if any(w in message for w in ["refund", "cashback", "return"]):
|
| return "refund"
|
| if any(w in message for w in ["courier", "parcel", "delivery", "customs"]):
|
| return "courier"
|
| return None
|
|
|
| def _update_narrative_stage(self, turn_count: int) -> None:
|
| """Update the narrative stage based on conversation progress."""
|
|
|
| has_threats = len(self.context.claimed_threats) > 0
|
| has_rewards = len(self.context.claimed_rewards) > 0
|
| has_deadlines = len(self.context.claimed_deadlines) > 0
|
| requested_payment = "payment" in self.context.requested_info
|
| requested_otp = "otp" in self.context.requested_info
|
|
|
|
|
| if turn_count <= 2:
|
| self.context.narrative_stage = ScamNarrativeStage.HOOK
|
| elif turn_count <= 5:
|
| if has_rewards or has_threats:
|
| self.context.narrative_stage = ScamNarrativeStage.BUILD_UP
|
| else:
|
| self.context.narrative_stage = ScamNarrativeStage.HOOK
|
| elif turn_count <= 10:
|
| if requested_payment or requested_otp:
|
| self.context.narrative_stage = ScamNarrativeStage.DEMAND
|
| else:
|
| self.context.narrative_stage = ScamNarrativeStage.BUILD_UP
|
| elif turn_count <= 15:
|
| if has_deadlines or has_threats:
|
| self.context.narrative_stage = ScamNarrativeStage.PRESSURE
|
| else:
|
| self.context.narrative_stage = ScamNarrativeStage.DEMAND
|
| else:
|
| self.context.narrative_stage = ScamNarrativeStage.COLLECTION
|
|
|
| def _identify_info_gaps(self) -> None:
|
| """Identify what information we still need to extract."""
|
| gaps = set()
|
|
|
|
|
| gaps.add("upi")
|
| gaps.add("phone")
|
| gaps.add("bank_account")
|
|
|
|
|
| if self.context.claimed_authorities:
|
| gaps.add("verification")
|
|
|
|
|
| if "name" not in [a.lower() for a in self.context.mentioned_names]:
|
| gaps.add("name")
|
|
|
| self.context.info_gaps = gaps
|
|
|
| def get_strategic_question(
|
| self,
|
| target_info: str,
|
| language: str = "en",
|
| ) -> Optional[str]:
|
| """
|
| Get a strategic question to extract specific information.
|
|
|
| Args:
|
| target_info: What info to extract ('upi', 'phone', 'bank_account', etc.)
|
| language: Response language
|
|
|
| Returns:
|
| Strategic question string
|
| """
|
| import random
|
|
|
| questions = STRATEGIC_QUESTIONS.get(target_info, [])
|
| if not questions:
|
| return None
|
|
|
| return random.choice(questions)
|
|
|
| def get_context_appropriate_response(
|
| self,
|
| emotion: str = "eager",
|
| language: str = "en",
|
| ) -> Optional[str]:
|
| """
|
| Get a response appropriate for current context.
|
|
|
| Args:
|
| emotion: Current emotional state
|
| language: Response language
|
|
|
| Returns:
|
| Context-appropriate response
|
| """
|
| import random
|
|
|
| stage = self.context.narrative_stage
|
| stage_responses = STAGE_RESPONSES.get(stage, {})
|
|
|
|
|
| for category, responses in stage_responses.items():
|
| if emotion.lower() in category.lower() or category.lower() in emotion.lower():
|
| return random.choice(responses)
|
|
|
|
|
| if stage_responses:
|
| first_category = list(stage_responses.values())[0]
|
| return random.choice(first_category)
|
|
|
| return None
|
|
|
| def should_extract_now(self) -> Tuple[bool, str]:
|
| """
|
| Determine if we should actively extract information now.
|
|
|
| Returns:
|
| Tuple of (should_extract, target_info)
|
| """
|
|
|
| if self.context.narrative_stage in [
|
| ScamNarrativeStage.DEMAND,
|
| ScamNarrativeStage.PRESSURE,
|
| ScamNarrativeStage.COLLECTION,
|
| ]:
|
|
|
| if "upi" in self.context.info_gaps:
|
| return True, "upi"
|
| if "phone" in self.context.info_gaps:
|
| return True, "phone"
|
| if "bank_account" in self.context.info_gaps:
|
| return True, "bank_account"
|
|
|
|
|
| if self.context.narrative_stage == ScamNarrativeStage.BUILD_UP:
|
| if "payment" in self.context.requested_info:
|
| return True, "upi"
|
|
|
| return False, ""
|
|
|
| def get_coherent_follow_up(self, last_scammer_message: str) -> Optional[str]:
|
| """
|
| Generate a coherent follow-up based on what scammer just said.
|
|
|
| Args:
|
| last_scammer_message: The scammer's last message
|
|
|
| Returns:
|
| Coherent follow-up response
|
| """
|
| message_lower = last_scammer_message.lower()
|
|
|
|
|
| if "@" in last_scammer_message:
|
| return "Okay noted! Let me try sending. What's your phone number in case it fails?"
|
|
|
|
|
| if re.search(r"\d{10}", last_scammer_message):
|
| return "Saved! Now give me UPI or account number for the transfer!"
|
|
|
|
|
| if any(w in message_lower for w in ["today", "now", "immediately", "urgent"]):
|
| return "Okay okay! I'm trying! Just give me the payment details quickly!"
|
|
|
|
|
| if any(w in message_lower for w in ["arrest", "police", "block"]):
|
| return "Please don't! I'll pay right now! Just tell me where to send!"
|
|
|
|
|
| if re.search(r"(?:rs\.?|₹)\s*\d+|\d+\s*(?:lakh|crore)", message_lower):
|
| return "Yes! I want to claim that! Tell me how to proceed!"
|
|
|
| return None
|
|
|
| def validate_response_coherence(
|
| self,
|
| proposed_response: str,
|
| last_scammer_message: str,
|
| ) -> Tuple[bool, float, str]:
|
| """
|
| Validate if a proposed response is coherent with context.
|
|
|
| Args:
|
| proposed_response: The response we're considering
|
| last_scammer_message: What scammer just said
|
|
|
| Returns:
|
| Tuple of (is_coherent, score, reason)
|
| """
|
| response_lower = proposed_response.lower()
|
| message_lower = last_scammer_message.lower()
|
|
|
| score = 1.0
|
| reasons = []
|
|
|
|
|
| scammer_topic = self._identify_topic(message_lower)
|
| agent_topic = self._identify_topic(response_lower)
|
|
|
| if scammer_topic and agent_topic and scammer_topic != agent_topic:
|
| score -= 0.3
|
| reasons.append(f"Topic mismatch: scammer={scammer_topic}, agent={agent_topic}")
|
|
|
|
|
| if any(w in message_lower for w in ["arrest", "police", "jail"]):
|
| if any(w in response_lower for w in ["excited", "happy", "wow"]):
|
| score -= 0.4
|
| reasons.append("Excited response to threat")
|
|
|
|
|
|
|
|
|
|
|
| if "i'll pay" in response_lower and self.context.turn_count < 3:
|
| score -= 0.2
|
| reasons.append("Payment offer too early")
|
|
|
| is_coherent = score >= 0.7
|
| reason = "; ".join(reasons) if reasons else "Coherent"
|
|
|
| return is_coherent, score, reason
|
|
|
| def get_context_summary(self) -> Dict:
|
| """Get summary of current context."""
|
| return {
|
| "turn_count": self.context.turn_count,
|
| "narrative_stage": self.context.narrative_stage.value,
|
| "claimed_amounts": self.context.claimed_amounts[-3:],
|
| "claimed_threats": self.context.claimed_threats[-3:],
|
| "claimed_rewards": self.context.claimed_rewards[-3:],
|
| "claimed_authorities": self.context.claimed_authorities[-3:],
|
| "scammer_requested": list(self.context.requested_info),
|
| "info_gaps": list(self.context.info_gaps),
|
| "topics_discussed": self.context.topics_discussed[-5:],
|
| }
|
|
|
| def reset(self) -> None:
|
| """Reset context for new conversation."""
|
| self.context = ConversationContext()
|
|
|
|
|
|
|
| _context_engine: Optional[ContextEngine] = None
|
|
|
|
|
| def get_context_engine() -> ContextEngine:
|
| """Get singleton ContextEngine instance."""
|
| global _context_engine
|
| if _context_engine is None:
|
| _context_engine = ContextEngine()
|
| return _context_engine
|
|
|
|
|
| def analyze_context(
|
| message: str,
|
| sender: str,
|
| turn_count: int,
|
| language: str = "en",
|
| ) -> ConversationContext:
|
| """Convenience function to analyze message context."""
|
| engine = get_context_engine()
|
| return engine.analyze_message(message, sender, turn_count, language)
|
|
|
|
|
| def get_strategic_response(
|
| target_info: str = "upi",
|
| language: str = "en",
|
| ) -> Optional[str]:
|
| """Get a strategic question to extract specific information."""
|
| engine = get_context_engine()
|
| return engine.get_strategic_question(target_info, language)
|
|
|
|
|
| def reset_context_engine() -> None:
|
| """Reset the context engine for new conversation."""
|
| global _context_engine
|
| if _context_engine is not None:
|
| _context_engine.reset()
|
|
|