cx_ai_agent_v1 / mcp /in_memory_clients.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
In-Memory MCP Client Wrappers
These clients wrap the in-memory services to provide the same interface as HTTP clients
"""
from typing import Dict, List, Optional
from mcp.in_memory_services import (
get_in_memory_store,
get_in_memory_search,
get_in_memory_email,
get_in_memory_calendar
)
from app.schema import Prospect, Company, Contact, Fact, Thread
import logging
logger = logging.getLogger(__name__)
class InMemoryStoreClient:
"""In-memory store client (compatible with HTTP client interface)"""
def __init__(self):
self.service = get_in_memory_store()
async def save_prospect(self, prospect) -> str:
"""Save a prospect"""
if isinstance(prospect, dict):
return await self.service.save_prospect(prospect)
return await self.service.save_prospect(prospect.dict())
async def get_prospect(self, prospect_id: str) -> Optional[Prospect]:
"""Get a prospect"""
data = await self.service.get_prospect(prospect_id)
if data:
return Prospect(**data)
return None
async def list_prospects(self) -> List[Prospect]:
"""List all prospects"""
data = await self.service.list_prospects()
return [Prospect(**p) for p in data]
async def save_company(self, company: Company) -> str:
"""Save a company"""
if isinstance(company, dict):
return await self.service.save_company(company)
return await self.service.save_company(company.dict())
async def get_company(self, company_id: str) -> Optional[Company]:
"""Get a company"""
data = await self.service.get_company(company_id)
if data:
return Company(**data)
return None
async def save_fact(self, fact) -> str:
"""Save a fact"""
if isinstance(fact, dict):
return await self.service.save_fact(fact)
return await self.service.save_fact(fact.dict())
async def save_contact(self, contact) -> str:
"""Save a contact"""
if isinstance(contact, dict):
return await self.service.save_contact(contact)
return await self.service.save_contact(contact.dict())
async def list_contacts_by_domain(self, domain: str) -> List[Contact]:
"""List contacts by domain"""
data = await self.service.list_contacts_by_domain(domain)
return [Contact(**c) for c in data]
async def check_suppression(self, supp_type: str, value: str) -> bool:
"""Check suppression"""
return await self.service.check_suppression(supp_type, value)
async def save_handoff(self, packet: Dict) -> str:
"""Save handoff packet"""
return await self.service.save_handoff(packet)
async def clear_all(self) -> str:
"""Clear all data"""
return await self.service.clear_all()
class InMemorySearchClient:
"""In-memory search client (compatible with WebSearchService interface)"""
def __init__(self):
self.service = get_in_memory_search()
async def query(self, q: str, max_results: int = 5) -> List[Dict]:
"""Search query (MCP protocol method)"""
return await self.service.query(q, max_results)
async def search(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]:
"""
Search method (compatible with WebSearchService interface)
Maps to query() for MCP compatibility
"""
return await self.query(query, max_results)
async def search_news(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]:
"""
News search method (compatible with WebSearchService interface)
Falls back to regular search for now
"""
return await self.query(query, max_results)
class InMemoryEmailClient:
"""In-memory email client"""
def __init__(self):
self.service = get_in_memory_email()
async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str:
"""Send email"""
return await self.service.send(to, subject, body, prospect_id)
async def get_thread(self, prospect_id: str) -> Optional[Thread]:
"""Get email thread"""
data = await self.service.get_thread(prospect_id)
if data:
return Thread(**data)
return None
class InMemoryCalendarClient:
"""In-memory calendar client"""
def __init__(self):
self.service = get_in_memory_calendar()
async def suggest_slots(self) -> List[Dict[str, str]]:
"""Suggest calendar slots"""
return await self.service.suggest_slots()
async def generate_ics(self, slot: Dict) -> str:
"""Generate ICS file"""
return await self.service.generate_ics(slot)