| """ |
| GUVI Hackathon Final Result Callback Module. |
| |
| Implements the mandatory callback to GUVI's evaluation endpoint |
| as specified in the problem statement. |
| |
| Requirement: "Once the system detects scam intent and the AI Agent |
| completes the engagement, participants must send the final extracted |
| intelligence to the GUVI evaluation endpoint." |
| |
| Callback Endpoint: POST https://hackathon.guvi.in/api/updateHoneyPotFinalResult |
| """ |
|
|
| import requests |
| from typing import Dict, List, Optional |
| from datetime import datetime |
|
|
| from app.config import settings |
| from app.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
| |
| DEFAULT_GUVI_CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult" |
|
|
|
|
| def identify_red_flags(messages: List[Dict]) -> List[str]: |
| """ |
| Identify explicit red flags from scammer messages. |
| |
| Returns a list of identified red flags for scoring. |
| GUVI Doc: "Red Flag Identification | 8 pts | >=5 flags = 8pts" |
| |
| Args: |
| messages: List of conversation messages |
| |
| Returns: |
| List of identified red flag descriptions |
| """ |
| red_flags: List[str] = [] |
| |
| scammer_messages = [ |
| m.get("message", "") for m in messages if m.get("sender") == "scammer" |
| ] |
| full_text_lower = " ".join(scammer_messages).lower() |
| full_text_raw = " ".join(scammer_messages) |
| |
| |
| red_flag_patterns = { |
| "Urgency/Time Pressure": [ |
| "urgent", "immediately", "now", "today", "hurry", "quick", |
| "fast", "expire", "last chance", "limited time", "deadline", |
| "turant", "jaldi", "abhi", "foran", |
| ], |
| "Authority Impersonation": [ |
| "police", "court", "government", "bank official", "rbi", |
| "investigation", "arrest", "legal", "warrant", "department", |
| "officer", "inspector", "commissioner", |
| ], |
| "Account/Service Threat": [ |
| "block", "suspend", "deactivate", "freeze", "seize", |
| "terminate", "close", "disable", "restrict", |
| ], |
| "OTP/Credential Request": [ |
| "otp", "password", "pin", "cvv", "verify", "confirm", |
| "share otp", "send otp", "tell otp", |
| ], |
| "Prize/Lottery Lure": [ |
| "won", "winner", "prize", "lottery", "jackpot", "lucky", |
| "congratulations", "reward", "selected", "chosen", |
| ], |
| "Payment/Fee Demand": [ |
| "processing fee", "transfer fee", "tax", "charges", |
| "pay first", "send money", "registration fee", |
| ], |
| "Suspicious Link": [ |
| "http://", "https://", "click here", "click link", |
| "www.", ".xyz", ".tk", "bit.ly", "tinyurl", |
| ], |
| "KYC/Document Request": [ |
| "kyc", "aadhaar", "pan card", "pan number", "update kyc", |
| "verify identity", "link expired", |
| ], |
| "False Urgency Claim": [ |
| "within 24 hours", "within 1 hour", "today only", |
| "expires today", "last warning", "final notice", |
| ], |
| "Impersonation of Known Entity": [ |
| "sbi", "hdfc", "icici", "axis", "rbi", "amazon", |
| "flipkart", "paytm", "phonepe", "gpay", |
| ], |
| } |
| |
| for flag_name, patterns in red_flag_patterns.items(): |
| for pattern in patterns: |
| if pattern in full_text_lower or pattern in full_text_raw: |
| if flag_name not in red_flags: |
| red_flags.append(flag_name) |
| break |
| |
| return red_flags |
|
|
|
|
| def count_elicitation_attempts(messages: List[Dict]) -> int: |
| """ |
| Count the number of elicitation attempts made by the agent. |
| |
| GUVI Doc: "Information Elicitation | 7 pts | Each elicitation attempt earns 1.5pts (max 7)" |
| Max 5 attempts for full 7 points (5 * 1.5 = 7.5, capped at 7). |
| |
| Elicitation = asking questions to extract scammer's financial details. |
| |
| Args: |
| messages: List of conversation messages |
| |
| Returns: |
| Number of elicitation attempts detected |
| """ |
| elicitation_patterns = [ |
| |
| r"upi\s*(id)?[\?\s]", |
| r"phone\s*(number)?[\?\s]", |
| r"account\s*(number)?[\?\s]", |
| r"ifsc[\?\s]", |
| r"bank\s*(details|account)[\?\s]", |
| r"what.{0,20}(upi|phone|number|account|ifsc)", |
| r"give.{0,15}(upi|phone|number|account|ifsc)", |
| r"tell.{0,15}(upi|phone|number|account|ifsc)", |
| r"send.{0,15}(upi|phone|number|account|details)", |
| r"share.{0,15}(upi|phone|number|account|details)", |
| |
| r"where.{0,30}\?", |
| r"what.{0,30}\?", |
| r"how.{0,30}\?", |
| r"which.{0,30}\?", |
| |
| r"kya\s*hai", |
| r"batao", |
| r"bolo", |
| r"dijiye", |
| r"bhejo", |
| ] |
| |
| import re |
| |
| agent_messages = [ |
| m.get("message", "") for m in messages if m.get("sender") == "agent" |
| ] |
| |
| count = 0 |
| for msg in agent_messages: |
| msg_lower = msg.lower() |
| for pattern in elicitation_patterns: |
| if re.search(pattern, msg_lower): |
| count += 1 |
| break |
| |
| return min(count, 5) |
|
|
|
|
| def generate_agent_notes( |
| messages: List[Dict], |
| extracted_intel: Dict, |
| scam_indicators: List[str], |
| ) -> str: |
| """ |
| Generate a detailed summary of scammer behavior for agent notes. |
| |
| Produces a law-enforcement-friendly summary covering: |
| - Identified red flags (explicitly enumerated for scoring) |
| - Identified scam type |
| - Tactics used (urgency, threats, impersonation, etc.) |
| - Elicitation attempts count |
| - Extracted intelligence summary |
| - Conversation depth |
| |
| Args: |
| messages: List of conversation messages |
| extracted_intel: Extracted intelligence dictionary |
| scam_indicators: List of detected scam indicators/keywords |
| |
| Returns: |
| Agent notes string with explicit red flag enumeration for GUVI scoring |
| """ |
| notes_parts: List[str] = [] |
|
|
| scammer_messages = [ |
| m.get("message", "") for m in messages if m.get("sender") == "scammer" |
| ] |
| full_scammer_text = " ".join(scammer_messages).lower() |
| full_scammer_raw = " ".join(scammer_messages) |
| |
| |
| red_flags = identify_red_flags(messages) |
| if red_flags: |
| flags_str = ", ".join(f"[{i+1}] {flag}" for i, flag in enumerate(red_flags)) |
| notes_parts.append(f"RED FLAGS DETECTED ({len(red_flags)}): {flags_str}") |
| |
| |
| elicitation_count = count_elicitation_attempts(messages) |
| if elicitation_count > 0: |
| notes_parts.append(f"ELICITATION ATTEMPTS: {elicitation_count} direct questions asked to extract scammer details") |
|
|
| |
| scam_type = identify_scam_type(full_scammer_text, full_scammer_raw) |
| if scam_type: |
| notes_parts.append(f"Scam type: {scam_type}") |
|
|
| |
| urgency_words = [ |
| "urgent", "immediately", "now", "today", "hurry", "quick", |
| "fast", "expire", "last chance", "turant", "jaldi", "abhi", |
| "\u0924\u0941\u0930\u0902\u0924", "\u091c\u0932\u094d\u0926\u0940", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in urgency_words): |
| notes_parts.append("Used urgency tactics to pressure victim") |
|
|
| authority_words = [ |
| "police", "court", "government", "bank official", "rbi", |
| "investigation", "arrest", "legal", "warrant", "department", |
| "\u092a\u0941\u0932\u093f\u0938", |
| "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in authority_words): |
| notes_parts.append("Attempted authority/official impersonation") |
|
|
| prize_words = [ |
| "won", "winner", "prize", "lottery", "jackpot", "lucky", |
| "congratulations", "reward", "jeeta", "jeet", "inaam", |
| "\u091c\u0940\u0924\u093e", "\u0907\u0928\u093e\u092e", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in prize_words): |
| notes_parts.append("Used prize/lottery lure") |
|
|
| payment_words = [ |
| "upi", "transfer", "send money", "pay", "account number", |
| "bank details", "paise bhejo", "transfer karo", |
| "\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in payment_words): |
| notes_parts.append("Attempted payment/money redirection") |
|
|
| credential_words = [ |
| "otp", "password", "pin", "cvv", "verify", "confirm", |
| "otp bhejo", "verify karo", "\u0913\u091f\u0940\u092a\u0940", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in credential_words): |
| notes_parts.append("Attempted OTP/credential harvesting") |
|
|
| threat_words = [ |
| "block", "suspend", "deactivate", "arrest", "fine", |
| "penalty", "legal action", "case file", "fir", |
| "\u092c\u094d\u0932\u0949\u0915", "\u092c\u0902\u0926", |
| ] |
| if any(w in full_scammer_text or w in full_scammer_raw for w in threat_words): |
| notes_parts.append("Used threat/fear tactics") |
|
|
| kyc_words = ["kyc", "aadhaar", "pan card", "pan number", "link expired", "update kyc"] |
| if any(w in full_scammer_text for w in kyc_words): |
| notes_parts.append("Used KYC/document verification lure") |
|
|
| loan_words = ["loan approved", "pre-approved", "emi", "interest rate", "loan offer"] |
| if any(w in full_scammer_text for w in loan_words): |
| notes_parts.append("Used fake loan/credit offer") |
|
|
| delivery_words = ["delivery failed", "customs", "parcel", "courier", "shipment"] |
| if any(w in full_scammer_text for w in delivery_words): |
| notes_parts.append("Used fake delivery/parcel scam") |
|
|
| |
| intel_items: List[str] = [] |
| if extracted_intel.get("upi_ids"): |
| items = extracted_intel["upi_ids"] |
| intel_items.append(f"{len(items)} UPI ID(s): {', '.join(items[:3])}") |
| if extracted_intel.get("bank_accounts"): |
| items = extracted_intel["bank_accounts"] |
| intel_items.append(f"{len(items)} bank account(s)") |
| if extracted_intel.get("ifsc_codes"): |
| items = extracted_intel["ifsc_codes"] |
| intel_items.append(f"{len(items)} IFSC code(s): {', '.join(items[:3])}") |
| if extracted_intel.get("phone_numbers"): |
| items = extracted_intel["phone_numbers"] |
| intel_items.append(f"{len(items)} phone number(s): {', '.join(items[:3])}") |
| if extracted_intel.get("phishing_links"): |
| items = extracted_intel["phishing_links"] |
| intel_items.append(f"{len(items)} phishing link(s)") |
| if extracted_intel.get("email_addresses"): |
| items = extracted_intel["email_addresses"] |
| intel_items.append(f"{len(items)} email address(es): {', '.join(items[:3])}") |
| if extracted_intel.get("case_ids"): |
| items = extracted_intel["case_ids"] |
| intel_items.append(f"{len(items)} case/reference ID(s): {', '.join(items[:3])}") |
| if extracted_intel.get("policy_numbers"): |
| items = extracted_intel["policy_numbers"] |
| intel_items.append(f"{len(items)} policy number(s): {', '.join(items[:3])}") |
| if extracted_intel.get("order_numbers"): |
| items = extracted_intel["order_numbers"] |
| intel_items.append(f"{len(items)} order/transaction ID(s): {', '.join(items[:3])}") |
|
|
| if intel_items: |
| notes_parts.append(f"Extracted intelligence: {'; '.join(intel_items)}") |
|
|
| |
| total_turns = len(scammer_messages) |
| if total_turns > 0: |
| notes_parts.append(f"Conversation depth: {total_turns} scammer message(s) analyzed") |
|
|
| if notes_parts: |
| return ". ".join(notes_parts) + "." |
| return "Scam engagement completed. Limited intelligence extracted." |
|
|
|
|
| def identify_scam_type(text_lower: str, text_raw: str = "") -> Optional[str]: |
| """ |
| Identify the primary scam type from scammer text. |
| |
| Returns a human-readable scam type label or None if unknown. |
| """ |
| |
| if any(w in text_lower for w in ["kyc", "aadhaar", "pan card", "update kyc"]): |
| return "KYC/Document Verification Fraud" |
| if any(w in text_lower for w in ["loan approved", "pre-approved", "emi", "loan offer"]): |
| return "Fake Loan/Credit Offer" |
| if any(w in text_lower for w in ["delivery failed", "customs", "parcel", "courier"]): |
| return "Fake Delivery/Parcel Scam" |
| if any(w in text_lower for w in ["won", "winner", "prize", "lottery", "jackpot"]): |
| return "Prize/Lottery Scam" |
| if any(w in text_lower for w in [ |
| "police", "arrest", "warrant", "court", "legal action", "investigation", |
| "\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", |
| ]) or any(w in text_raw for w in [ |
| "\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", |
| ]): |
| return "Authority/Police Impersonation" |
| if any(w in text_lower for w in [ |
| "bank official", "rbi", "bank manager", "account blocked", "account suspended", |
| ]): |
| return "Bank Official Impersonation" |
| if any(w in text_lower for w in ["otp", "password", "pin", "cvv"]): |
| return "Credential/OTP Harvesting" |
| if any(w in text_lower for w in ["refund", "cashback", "insurance claim"]): |
| return "Refund/Insurance Scam" |
| if any(w in text_lower for w in ["investment", "returns", "crypto", "trading", "profit"]): |
| return "Investment/Trading Scam" |
| if any(w in text_lower for w in ["electricity", "electric bill", "power bill", "power cut", "power disconnection"]): |
| return "Electricity Bill Scam" |
| if any(w in text_lower for w in ["utility", "water bill", "gas bill"]): |
| return "Utility Bill Scam" |
| if any(w in text_lower for w in ["job", "employment", "hiring", "work from home", "earn from home"]): |
| return "Job/Employment Scam" |
| if any(w in text_lower for w in ["income tax", "tax notice", "tax department", "it department"]): |
| return "Income Tax Scam" |
| if any(w in text_lower for w in ["tech support", "computer problem", "virus", "microsoft", "windows"]): |
| return "Tech Support Scam" |
| if any(w in text_lower for w in ["government scheme", "govt scheme", "subsidy", "pm scheme"]): |
| return "Government Scheme Scam" |
| if any(w in text_lower for w in ["upi", "send money", "transfer", "pay"]): |
| return "Payment Redirection Fraud" |
| return None |
|
|
|
|
| def extract_suspicious_keywords( |
| messages: List[Dict], |
| scam_indicators: List[str], |
| ) -> List[str]: |
| """ |
| Extract suspicious keywords from the conversation. |
| |
| Checks scammer messages for English, Hindi, and Hinglish scam keywords |
| so that multilingual conversations produce meaningful keyword lists. |
| |
| Args: |
| messages: List of conversation messages |
| scam_indicators: List of detected scam indicators from detector |
| |
| Returns: |
| List of suspicious keywords found in messages (up to 25) |
| """ |
| keywords = set(scam_indicators) if scam_indicators else set() |
|
|
| |
| en_patterns = [ |
| "urgent", "immediately", "now", "today", "hurry", "fast", "quick", |
| "won", "winner", "prize", "lottery", "jackpot", "congratulations", |
| "otp", "verify", "confirm", "blocked", "suspended", "deactivated", |
| "police", "arrest", "court", "legal", "investigation", "warrant", |
| "transfer", "send money", "pay now", "account blocked", |
| "free", "gift", "reward", "selected", "lucky", |
| "click here", "call now", "limited time", "expire", |
| "kyc", "aadhaar", "pan card", "link expired", |
| "upi", "bank account", "ifsc", "cvv", "pin", |
| "loan approved", "credit card", "insurance", "refund", |
| "delivery failed", "customs", "parcel", |
| ] |
|
|
| |
| hi_patterns = [ |
| "turant", "jaldi", "abhi", |
| "jeeta", "jeet", "inaam", "lottery", |
| "otp bhejo", "verify karo", "confirm karo", |
| "block", "suspend", "band", |
| "police", "giraftaar", "giraftari", "court", "kanoon", |
| "paise bhejo", "transfer karo", "pay karo", |
| "muft", "free", "gift", |
| "link pe click", "call karo", |
| "kyc update", "aadhaar", "pan", |
| "loan", "insurance", "refund", |
| |
| "\u0924\u0941\u0930\u0902\u0924", |
| "\u091c\u0932\u094d\u0926\u0940", |
| "\u0905\u092d\u0940", |
| "\u091c\u0940\u0924\u093e", |
| "\u0907\u0928\u093e\u092e", |
| "\u0932\u0949\u091f\u0930\u0940", |
| "\u092a\u0941\u0932\u093f\u0938", |
| "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", |
| "\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b", |
| "\u091f\u094d\u0930\u093e\u0902\u0938\u092b\u0930", |
| "\u092c\u094d\u0932\u0949\u0915", |
| "\u092c\u0948\u0902\u0915", |
| "\u0916\u093e\u0924\u093e", |
| "\u092f\u0942\u092a\u0940\u0906\u0908", |
| "\u0913\u091f\u0940\u092a\u0940", |
| ] |
|
|
| scammer_messages = [ |
| m.get("message", "") for m in messages if m.get("sender") == "scammer" |
| ] |
| full_text = " ".join(scammer_messages).lower() |
|
|
| for pattern in en_patterns: |
| if pattern in full_text: |
| keywords.add(pattern) |
|
|
| |
| full_text_raw = " ".join(scammer_messages) |
| for pattern in hi_patterns: |
| if pattern.lower() in full_text or pattern in full_text_raw: |
| keywords.add(pattern) |
|
|
| return sorted(keywords)[:25] |
|
|
|
|
| def send_final_result_to_guvi( |
| session_id: str, |
| scam_detected: bool, |
| total_messages: int, |
| extracted_intel: Dict, |
| messages: List[Dict], |
| scam_indicators: List[str] = None, |
| agent_notes: str = None, |
| engagement_duration_seconds: int = 0, |
| ) -> bool: |
| """ |
| Send final result to GUVI evaluation endpoint. |
| |
| This is MANDATORY for the hackathon submission. The platform uses |
| this data to measure engagement depth, intelligence quality, and |
| agent effectiveness. |
| |
| Args: |
| session_id: Unique session ID for the conversation |
| scam_detected: Whether scam intent was confirmed |
| total_messages: Total number of messages exchanged |
| extracted_intel: Dictionary of extracted intelligence |
| messages: Full conversation history |
| scam_indicators: Optional list of detected scam indicators |
| agent_notes: Optional pre-generated agent notes |
| engagement_duration_seconds: Duration of engagement in seconds |
| |
| Returns: |
| True if callback was successful, False otherwise |
| """ |
| if not settings.GUVI_CALLBACK_ENABLED: |
| logger.info("GUVI callback disabled, skipping") |
| return True |
| |
| callback_url = settings.GUVI_CALLBACK_URL or DEFAULT_GUVI_CALLBACK_URL |
| |
| suspicious_keywords = extract_suspicious_keywords( |
| messages, |
| scam_indicators or [], |
| ) |
| |
| if not agent_notes: |
| agent_notes = generate_agent_notes( |
| messages, |
| extracted_intel, |
| scam_indicators or [], |
| ) |
| |
| |
| scammer_messages = [m.get("message", "") for m in messages if m.get("sender") == "scammer"] |
| scammer_text = " ".join(scammer_messages) |
| scam_type = identify_scam_type(scammer_text.lower(), scammer_text) |
| |
| |
| payload = { |
| "sessionId": session_id, |
| "status": "success", |
| "scamDetected": scam_detected, |
| "scamType": scam_type or "Financial Fraud", |
| "confidenceLevel": 0.95, |
| "totalMessagesExchanged": total_messages, |
| "engagementDurationSeconds": engagement_duration_seconds, |
| "extractedIntelligence": { |
| "bankAccounts": extracted_intel.get("bank_accounts", []), |
| "upiIds": extracted_intel.get("upi_ids", []), |
| "ifscCodes": extracted_intel.get("ifsc_codes", []), |
| "phishingLinks": extracted_intel.get("phishing_links", []), |
| "phoneNumbers": extracted_intel.get("phone_numbers", []), |
| "emailAddresses": extracted_intel.get("email_addresses", []), |
| "caseIds": extracted_intel.get("case_ids", []), |
| "policyNumbers": extracted_intel.get("policy_numbers", []), |
| "orderNumbers": extracted_intel.get("order_numbers", []), |
| "suspiciousKeywords": suspicious_keywords, |
| }, |
| "engagementMetrics": { |
| "engagementDurationSeconds": engagement_duration_seconds, |
| "totalMessagesExchanged": total_messages, |
| }, |
| "agentNotes": agent_notes, |
| } |
| |
| logger.info(f"Sending GUVI callback for session {session_id}") |
| logger.debug(f"GUVI callback payload: {payload}") |
| |
| try: |
| response = requests.post( |
| callback_url, |
| json=payload, |
| timeout=10, |
| headers={ |
| "Content-Type": "application/json", |
| }, |
| ) |
| |
| if response.status_code == 200: |
| logger.info(f"GUVI callback successful for session {session_id}") |
| return True |
| else: |
| logger.warning( |
| f"GUVI callback returned status {response.status_code}: {response.text}" |
| ) |
| return False |
| |
| except requests.exceptions.Timeout: |
| logger.error(f"GUVI callback timed out for session {session_id}") |
| return False |
| except requests.exceptions.RequestException as e: |
| logger.error(f"GUVI callback failed for session {session_id}: {e}") |
| return False |
| except Exception as e: |
| logger.error(f"Unexpected error in GUVI callback: {e}") |
| return False |
|
|
|
|
| def should_send_callback( |
| turn_count: int, |
| max_turns_reached: bool, |
| extraction_confidence: float, |
| terminated: bool, |
| ) -> bool: |
| """ |
| Determine if GUVI callback should be sent based on conversation state. |
| |
| Callback should be sent when: |
| - Turn count >= 5 (GUVI runs 10 turns max, send callback frequently) |
| - Max turns (10 or 20) is reached |
| - High extraction confidence (>= 0.5) achieved |
| - Session is explicitly terminated |
| |
| Args: |
| turn_count: Current turn count |
| max_turns_reached: Whether max turns limit was hit |
| extraction_confidence: Confidence in extracted intelligence |
| terminated: Whether session is terminated |
| |
| Returns: |
| True if callback should be sent |
| """ |
| |
| |
| if turn_count >= 5: |
| logger.info(f"Callback trigger: turn count >= 5 ({turn_count})") |
| return True |
| |
| |
| if max_turns_reached or turn_count >= 10: |
| logger.info(f"Callback trigger: max turns reached ({turn_count})") |
| return True |
| |
| |
| if extraction_confidence >= 0.5: |
| logger.info(f"Callback trigger: extraction confidence ({extraction_confidence:.2f})") |
| return True |
| |
| |
| if terminated: |
| logger.info("Callback trigger: session terminated") |
| return True |
| |
| return False |
|
|