cx_ai_agent_v1 / mcp /productivity_services.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Productivity-Enhancing MCP Services
Real-world services that increase sales automation efficiency
"""
import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import re
import logging
logger = logging.getLogger(__name__)
class MCPAnalyticsService:
"""
Analytics Service - Track metrics, conversions, and performance
Real-world use case: Monitor pipeline health and ROI
"""
def __init__(self):
self.metrics = {
"pipeline_runs": 0,
"prospects_discovered": 0,
"contacts_found": 0,
"emails_generated": 0,
"emails_sent": 0,
"replies_received": 0,
"meetings_booked": 0,
"conversion_rate": 0.0,
"average_response_time": 0.0,
"top_performing_sequences": [],
"daily_stats": {}
}
self.events = []
logger.info("MCP Analytics Service initialized")
async def track_event(self, event_type: str, data: Dict) -> str:
"""Track an event for analytics"""
event = {
"type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
self.events.append(event)
# Update aggregated metrics
today = datetime.utcnow().strftime('%Y-%m-%d')
if today not in self.metrics["daily_stats"]:
self.metrics["daily_stats"][today] = {
"pipeline_runs": 0,
"prospects": 0,
"contacts": 0,
"emails": 0
}
if event_type == "pipeline_run":
self.metrics["pipeline_runs"] += 1
self.metrics["daily_stats"][today]["pipeline_runs"] += 1
elif event_type == "prospect_discovered":
self.metrics["prospects_discovered"] += 1
self.metrics["daily_stats"][today]["prospects"] += 1
elif event_type == "contact_found":
self.metrics["contacts_found"] += 1
self.metrics["daily_stats"][today]["contacts"] += 1
elif event_type == "email_generated":
self.metrics["emails_generated"] += 1
self.metrics["daily_stats"][today]["emails"] += 1
elif event_type == "email_sent":
self.metrics["emails_sent"] += 1
elif event_type == "reply_received":
self.metrics["replies_received"] += 1
elif event_type == "meeting_booked":
self.metrics["meetings_booked"] += 1
# Calculate conversion rate
if self.metrics["emails_sent"] > 0:
self.metrics["conversion_rate"] = (
self.metrics["meetings_booked"] / self.metrics["emails_sent"]
) * 100
logger.info(f"Analytics event tracked: {event_type}")
return "tracked"
async def get_metrics(self) -> Dict:
"""Get current metrics"""
return self.metrics
async def get_dashboard_data(self) -> Dict:
"""Get formatted dashboard data"""
return {
"summary": {
"Total Pipeline Runs": self.metrics["pipeline_runs"],
"Prospects Discovered": self.metrics["prospects_discovered"],
"Contacts Found": self.metrics["contacts_found"],
"Emails Generated": self.metrics["emails_generated"],
"Emails Sent": self.metrics["emails_sent"],
"Replies Received": self.metrics["replies_received"],
"Meetings Booked": self.metrics["meetings_booked"],
"Conversion Rate": f"{self.metrics['conversion_rate']:.2f}%"
},
"daily_stats": self.metrics["daily_stats"],
"recent_events": self.events[-10:] # Last 10 events
}
class MCPEnrichmentService:
"""
Enrichment Service - Enrich prospect and contact data
Real-world use case: Add company info, social profiles, tech stack
"""
def __init__(self):
# Mock enrichment database
self.enrichment_db = {
"shopify.com": {
"employee_count": "10,000+",
"founded_year": 2006,
"funding": "$2.9B",
"tech_stack": ["Ruby on Rails", "React", "MySQL", "Redis"],
"social_profiles": {
"linkedin": "https://linkedin.com/company/shopify",
"twitter": "https://twitter.com/shopify"
},
"industry_tags": ["E-commerce", "SaaS", "Retail Tech"],
"revenue_range": "$1B - $5B"
},
"stripe.com": {
"employee_count": "8,000+",
"founded_year": 2010,
"funding": "$2.2B",
"tech_stack": ["Ruby", "Scala", "Go", "React"],
"social_profiles": {
"linkedin": "https://linkedin.com/company/stripe",
"twitter": "https://twitter.com/stripe"
},
"industry_tags": ["Fintech", "Payments", "SaaS"],
"revenue_range": "$5B+"
}
}
logger.info("MCP Enrichment Service initialized")
async def enrich_company(self, domain: str) -> Dict:
"""Enrich company data with additional information"""
logger.info(f"Enriching company data for: {domain}")
# Check if we have enrichment data
enriched_data = self.enrichment_db.get(domain, {})
if not enriched_data:
# Generate estimated data based on domain
enriched_data = {
"employee_count": "Unknown",
"founded_year": None,
"funding": "Unknown",
"tech_stack": [],
"social_profiles": {
"linkedin": f"https://linkedin.com/company/{domain.split('.')[0]}",
"twitter": f"https://twitter.com/{domain.split('.')[0]}"
},
"industry_tags": [],
"revenue_range": "Unknown",
"enrichment_source": "estimated"
}
else:
enriched_data["enrichment_source"] = "database"
return enriched_data
async def enrich_contact(self, email: str, name: str) -> Dict:
"""Enrich contact data with social profiles and background"""
logger.info(f"Enriching contact data for: {email}")
# Extract info from email
domain = email.split('@')[1] if '@' in email else ''
username = email.split('@')[0] if '@' in email else ''
return {
"email": email,
"name": name,
"linkedin_profile": f"https://linkedin.com/in/{username.replace('.', '-')}",
"twitter_profile": f"https://twitter.com/{username.replace('.', '_')}",
"github_profile": f"https://github.com/{username.replace('.', '')}",
"estimated_seniority": self._estimate_seniority(email, name),
"enrichment_timestamp": datetime.utcnow().isoformat()
}
def _estimate_seniority(self, email: str, name: str) -> str:
"""Estimate seniority based on email patterns"""
email_lower = email.lower()
if any(x in email_lower for x in ['ceo', 'founder', 'chief']):
return "Executive"
elif any(x in email_lower for x in ['vp', 'director', 'head']):
return "Senior"
elif any(x in email_lower for x in ['manager', 'lead']):
return "Mid-Level"
else:
return "Individual Contributor"
class MCPValidationService:
"""
Validation Service - Validate emails, domains, and contact information
Real-world use case: Reduce bounce rates and improve deliverability
"""
def __init__(self):
# Common disposable email domains
self.disposable_domains = [
"tempmail.com", "throwaway.email", "guerrillamail.com",
"10minutemail.com", "mailinator.com"
]
# Known invalid patterns
self.invalid_patterns = [
"noreply@", "no-reply@", "donotreply@",
"info@", "admin@", "support@", "sales@"
]
self.validation_cache = {}
logger.info("MCP Validation Service initialized")
async def validate_email(self, email: str) -> Dict:
"""Validate email address"""
logger.info(f"Validating email: {email}")
result = {
"email": email,
"is_valid": False,
"is_disposable": False,
"is_role_based": False,
"is_catchall": False,
"deliverability_score": 0,
"validation_issues": [],
"validated_at": datetime.utcnow().isoformat()
}
# Basic format validation
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, email):
result["validation_issues"].append("Invalid email format")
return result
# Extract domain
domain = email.split('@')[1] if '@' in email else ''
# Check if disposable
if domain in self.disposable_domains:
result["is_disposable"] = True
result["validation_issues"].append("Disposable email domain")
# Check if role-based
for pattern in self.invalid_patterns:
if email.lower().startswith(pattern):
result["is_role_based"] = True
result["validation_issues"].append("Role-based email (low engagement)")
break
# Calculate deliverability score
score = 100
if result["is_disposable"]:
score -= 50
if result["is_role_based"]:
score -= 30
if len(result["validation_issues"]) == 0:
result["is_valid"] = True
result["deliverability_score"] = max(0, score)
return result
async def validate_domain(self, domain: str) -> Dict:
"""Validate domain"""
logger.info(f"Validating domain: {domain}")
result = {
"domain": domain,
"is_valid": False,
"has_mx_records": False,
"is_active": False,
"validation_issues": [],
"validated_at": datetime.utcnow().isoformat()
}
# Basic domain format validation
domain_regex = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$'
if not re.match(domain_regex, domain):
result["validation_issues"].append("Invalid domain format")
return result
# For demo purposes, assume most domains are valid
# In production, would do DNS lookups
result["is_valid"] = True
result["has_mx_records"] = True
result["is_active"] = True
return result
async def batch_validate_emails(self, emails: List[str]) -> List[Dict]:
"""Batch validate multiple emails"""
logger.info(f"Batch validating {len(emails)} emails")
results = []
for email in emails:
validation = await self.validate_email(email)
results.append(validation)
return results
class MCPSummaryService:
"""
Summary Service - Generate AI-powered summaries for companies and prospects
Real-world use case: Create comprehensive, informative summaries for sales teams
ENHANCED: Now uses LLM service with strict grounding to prevent hallucination
"""
def __init__(self):
from services.llm_service import get_llm_service
self.llm = get_llm_service()
logger.info("MCP Summary Service initialized with LLM grounding support")
async def generate_company_summary(self, company_data: Dict, enrichment_data: Dict = None) -> str:
"""
ENHANCED: Generate comprehensive AI summary for a prospect company
Now uses LLM service with strict grounding
Args:
company_data: Basic company information (INCLUDING raw_facts if available)
enrichment_data: Optional enriched data from enrichment service
Returns:
Detailed summary string grounded in facts
"""
logger.info(f"Generating GROUNDED summary for company: {company_data.get('name', 'Unknown')}")
name = company_data.get('name', 'Unknown Company')
# Merge enrichment data if available
if enrichment_data:
company_data = {**company_data, **enrichment_data}
# Get raw facts for grounding
raw_facts = company_data.get('raw_facts', [])
# Use LLM service for grounded summarization
summary = await self.llm.generate_grounded_summary(
company_name=name,
extracted_data=company_data,
raw_facts=raw_facts,
summary_type="prospect"
)
return summary
async def generate_prospect_summary(
self,
prospect_data: Dict,
company_enrichment: Dict = None,
contact_data: List[Dict] = None
) -> str:
"""
Generate comprehensive AI summary for a sales prospect
Args:
prospect_data: Basic prospect information with company data
company_enrichment: Enriched company data
contact_data: List of contacts found
Returns:
Detailed prospect summary
"""
logger.info("Generating prospect summary")
company = prospect_data.get('company', {})
company_name = company.get('name', 'Unknown Company')
domain = company.get('domain', '')
industry = company.get('industry', 'Unknown')
# Start with company summary
company_summary = await self.generate_company_summary(company, company_enrichment)
# Add prospect-specific insights
fit_score = prospect_data.get('fit_score', 0.0)
status = prospect_data.get('status', 'new')
# Contacts analysis
contact_summary = ""
if contact_data:
contact_count = len(contact_data)
contact_summary = f" We have identified {contact_count} key contact{'s' if contact_count > 1 else ''}"
# Identify decision makers
decision_makers = [c for c in contact_data if any(
title_word in c.get('title', '').lower()
for title_word in ['ceo', 'cto', 'cfo', 'vp', 'director', 'head', 'chief']
)]
if decision_makers:
contact_summary += f", including {len(decision_makers)} decision-maker{'s' if len(decision_makers) > 1 else ''}"
contact_summary += "."
# Fit assessment
fit_assessment = ""
if fit_score > 0:
if fit_score >= 0.8:
fit_assessment = " **High Priority:** This prospect shows excellent fit based on company size, industry, and technology profile."
elif fit_score >= 0.6:
fit_assessment = " **Good Fit:** This prospect demonstrates strong alignment with our ideal customer profile."
elif fit_score >= 0.4:
fit_assessment = " **Moderate Fit:** This prospect shows potential but may require additional qualification."
else:
fit_assessment = " **Low Priority:** This prospect shows limited fit with our target criteria."
# Combine all sections
full_summary = company_summary + contact_summary + fit_assessment
return full_summary
async def generate_client_summary(
self,
client_data: Dict,
enrichment_data: Dict = None
) -> str:
"""
ENHANCED: Generate comprehensive AI summary for CLIENT company (the company we're selling FOR)
Now uses LLM service with strict grounding to prevent hallucination
Args:
client_data: Client profile data with offerings, value props, etc. (INCLUDING raw_facts)
enrichment_data: Optional enriched data
Returns:
Detailed client summary grounded in extracted facts
"""
logger.info(f"Generating GROUNDED client summary for: {client_data.get('name', 'Unknown')}")
name = client_data.get('name', 'Unknown Company')
# Merge enrichment data if available
if enrichment_data:
client_data = {**client_data, **enrichment_data}
# Get raw facts for grounding (if available)
raw_facts = client_data.get('raw_facts', [])
# Use LLM service for grounded summarization
summary = await self.llm.generate_grounded_summary(
company_name=name,
extracted_data=client_data,
raw_facts=raw_facts,
summary_type="client"
)
return summary
# Singleton instances
_analytics_service: Optional[MCPAnalyticsService] = None
_enrichment_service: Optional[MCPEnrichmentService] = None
_validation_service: Optional[MCPValidationService] = None
_summary_service: Optional[MCPSummaryService] = None
def get_analytics_service() -> MCPAnalyticsService:
"""Get or create analytics service instance"""
global _analytics_service
if _analytics_service is None:
_analytics_service = MCPAnalyticsService()
return _analytics_service
def get_enrichment_service() -> MCPEnrichmentService:
"""Get or create enrichment service instance"""
global _enrichment_service
if _enrichment_service is None:
_enrichment_service = MCPEnrichmentService()
return _enrichment_service
def get_validation_service() -> MCPValidationService:
"""Get or create validation service instance"""
global _validation_service
if _validation_service is None:
_validation_service = MCPValidationService()
return _validation_service
def get_summary_service() -> MCPSummaryService:
"""Get or create summary service instance"""
global _summary_service
if _summary_service is None:
_summary_service = MCPSummaryService()
return _summary_service