| """ | |
| In-Memory MCP Client Wrappers | |
| These clients wrap the in-memory services to provide the same interface as HTTP clients | |
| """ | |
| from typing import Dict, List, Optional | |
| from mcp.in_memory_services import ( | |
| get_in_memory_store, | |
| get_in_memory_search, | |
| get_in_memory_email, | |
| get_in_memory_calendar | |
| ) | |
| from app.schema import Prospect, Company, Contact, Fact, Thread | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class InMemoryStoreClient: | |
| """In-memory store client (compatible with HTTP client interface)""" | |
| def __init__(self): | |
| self.service = get_in_memory_store() | |
| async def save_prospect(self, prospect) -> str: | |
| """Save a prospect""" | |
| if isinstance(prospect, dict): | |
| return await self.service.save_prospect(prospect) | |
| return await self.service.save_prospect(prospect.dict()) | |
| async def get_prospect(self, prospect_id: str) -> Optional[Prospect]: | |
| """Get a prospect""" | |
| data = await self.service.get_prospect(prospect_id) | |
| if data: | |
| return Prospect(**data) | |
| return None | |
| async def list_prospects(self) -> List[Prospect]: | |
| """List all prospects""" | |
| data = await self.service.list_prospects() | |
| return [Prospect(**p) for p in data] | |
| async def save_company(self, company: Company) -> str: | |
| """Save a company""" | |
| if isinstance(company, dict): | |
| return await self.service.save_company(company) | |
| return await self.service.save_company(company.dict()) | |
| async def get_company(self, company_id: str) -> Optional[Company]: | |
| """Get a company""" | |
| data = await self.service.get_company(company_id) | |
| if data: | |
| return Company(**data) | |
| return None | |
| async def save_fact(self, fact) -> str: | |
| """Save a fact""" | |
| if isinstance(fact, dict): | |
| return await self.service.save_fact(fact) | |
| return await self.service.save_fact(fact.dict()) | |
| async def save_contact(self, contact) -> str: | |
| """Save a contact""" | |
| if isinstance(contact, dict): | |
| return await self.service.save_contact(contact) | |
| return await self.service.save_contact(contact.dict()) | |
| async def list_contacts_by_domain(self, domain: str) -> List[Contact]: | |
| """List contacts by domain""" | |
| data = await self.service.list_contacts_by_domain(domain) | |
| return [Contact(**c) for c in data] | |
| async def check_suppression(self, supp_type: str, value: str) -> bool: | |
| """Check suppression""" | |
| return await self.service.check_suppression(supp_type, value) | |
| async def save_handoff(self, packet: Dict) -> str: | |
| """Save handoff packet""" | |
| return await self.service.save_handoff(packet) | |
| async def clear_all(self) -> str: | |
| """Clear all data""" | |
| return await self.service.clear_all() | |
| class InMemorySearchClient: | |
| """In-memory search client (compatible with WebSearchService interface)""" | |
| def __init__(self): | |
| self.service = get_in_memory_search() | |
| async def query(self, q: str, max_results: int = 5) -> List[Dict]: | |
| """Search query (MCP protocol method)""" | |
| return await self.service.query(q, max_results) | |
| async def search(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]: | |
| """ | |
| Search method (compatible with WebSearchService interface) | |
| Maps to query() for MCP compatibility | |
| """ | |
| return await self.query(query, max_results) | |
| async def search_news(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]: | |
| """ | |
| News search method (compatible with WebSearchService interface) | |
| Falls back to regular search for now | |
| """ | |
| return await self.query(query, max_results) | |
| class InMemoryEmailClient: | |
| """In-memory email client""" | |
| def __init__(self): | |
| self.service = get_in_memory_email() | |
| async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str: | |
| """Send email""" | |
| return await self.service.send(to, subject, body, prospect_id) | |
| async def get_thread(self, prospect_id: str) -> Optional[Thread]: | |
| """Get email thread""" | |
| data = await self.service.get_thread(prospect_id) | |
| if data: | |
| return Thread(**data) | |
| return None | |
| class InMemoryCalendarClient: | |
| """In-memory calendar client""" | |
| def __init__(self): | |
| self.service = get_in_memory_calendar() | |
| async def suggest_slots(self) -> List[Dict[str, str]]: | |
| """Suggest calendar slots""" | |
| return await self.service.suggest_slots() | |
| async def generate_ics(self, slot: Dict) -> str: | |
| """Generate ICS file""" | |
| return await self.service.generate_ics(slot) | |