|
|
import asyncio |
|
|
import logging |
|
|
from typing import Dict, Any, List, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
from mcp.server.fastmcp import FastMCP |
|
|
|
|
|
from services.vector_store_service import VectorStoreService |
|
|
from services.document_store_service import DocumentStoreService |
|
|
from services.embedding_service import EmbeddingService |
|
|
from services.llm_service import LLMService |
|
|
from services.ocr_service import OCRService |
|
|
|
|
|
from mcp_tools.ingestion_tool import IngestionTool |
|
|
from mcp_tools.search_tool import SearchTool |
|
|
from mcp_tools.generative_tool import GenerativeTool |
|
|
|
|
|
|
|
|
from services.llamaindex_service import LlamaIndexService |
|
|
from services.elevenlabs_service import ElevenLabsService |
|
|
from services.podcast_generator_service import PodcastGeneratorService |
|
|
from mcp_tools.voice_tool import VoiceTool |
|
|
from mcp_tools.podcast_tool import PodcastTool |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logger.info("Initializing services for FastMCP...") |
|
|
vector_store_service = VectorStoreService() |
|
|
document_store_service = DocumentStoreService() |
|
|
embedding_service_instance = EmbeddingService() |
|
|
llm_service_instance = LLMService() |
|
|
ocr_service_instance = OCRService() |
|
|
|
|
|
ingestion_tool_instance = IngestionTool( |
|
|
vector_store=vector_store_service, |
|
|
document_store=document_store_service, |
|
|
embedding_service=embedding_service_instance, |
|
|
ocr_service=ocr_service_instance |
|
|
) |
|
|
search_tool_instance = SearchTool( |
|
|
vector_store=vector_store_service, |
|
|
embedding_service=embedding_service_instance, |
|
|
document_store=document_store_service |
|
|
) |
|
|
generative_tool_instance = GenerativeTool( |
|
|
llm_service=llm_service_instance, |
|
|
search_tool=search_tool_instance |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Initializing Phase 2 & 3 services...") |
|
|
llamaindex_service_instance = LlamaIndexService(document_store_service) |
|
|
elevenlabs_service_instance = ElevenLabsService(llamaindex_service_instance) |
|
|
podcast_generator_instance = PodcastGeneratorService( |
|
|
llamaindex_service=llamaindex_service_instance, |
|
|
llm_service=llm_service_instance |
|
|
) |
|
|
|
|
|
voice_tool_instance = VoiceTool(elevenlabs_service_instance) |
|
|
podcast_tool_instance = PodcastTool(podcast_generator_instance) |
|
|
|
|
|
mcp = FastMCP("") |
|
|
logger.info("FastMCP server initialized.") |
|
|
|
|
|
@mcp.tool() |
|
|
async def ingest_document(file_path: str, file_type: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Process and index a document from a local file path for searching. |
|
|
Automatically determines file_type if not provided. |
|
|
""" |
|
|
logger.info(f"Tool 'ingest_document' called with file_path: {file_path}, file_type: {file_type}") |
|
|
try: |
|
|
actual_file_type = file_type |
|
|
if not actual_file_type: |
|
|
actual_file_type = Path(file_path).suffix.lower().strip('.') |
|
|
logger.info(f"Inferred file_type: {actual_file_type}") |
|
|
result = await ingestion_tool_instance.process_document(file_path, actual_file_type) |
|
|
logger.info(f"Ingestion result: {result}") |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'ingest_document' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def semantic_search(query: str, top_k: int = 5, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Search through indexed content using natural language. |
|
|
'filters' can be used to narrow down the search. |
|
|
""" |
|
|
logger.info(f"Tool 'semantic_search' called with query: {query}, top_k: {top_k}, filters: {filters}") |
|
|
try: |
|
|
results = await search_tool_instance.search(query, top_k, filters) |
|
|
return { |
|
|
"success": True, |
|
|
"query": query, |
|
|
"results": [result.to_dict() for result in results], |
|
|
"total_results": len(results) |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'semantic_search' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e), "results": []} |
|
|
|
|
|
@mcp.tool() |
|
|
async def summarize_content( |
|
|
content: Optional[str] = None, |
|
|
document_id: Optional[str] = None, |
|
|
style: str = "concise" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a summary of provided content or a document_id. |
|
|
Available styles: concise, detailed, bullet_points, executive. |
|
|
""" |
|
|
logger.info(f"Tool 'summarize_content' called. doc_id: {document_id}, style: {style}, has_content: {content is not None}") |
|
|
try: |
|
|
text_to_summarize = content |
|
|
if document_id and not text_to_summarize: |
|
|
doc = await document_store_service.get_document(document_id) |
|
|
if not doc: |
|
|
return {"success": False, "error": f"Document {document_id} not found"} |
|
|
text_to_summarize = doc.content |
|
|
if not text_to_summarize: |
|
|
return {"success": False, "error": "No content provided for summarization"} |
|
|
max_length = 10000 |
|
|
if len(text_to_summarize) > max_length: |
|
|
logger.warning(f"Content for summarization is long ({len(text_to_summarize)} chars), truncating to {max_length}") |
|
|
text_to_summarize = text_to_summarize[:max_length] + "..." |
|
|
summary = await generative_tool_instance.summarize(text_to_summarize, style) |
|
|
return { |
|
|
"success": True, |
|
|
"summary": summary, |
|
|
"original_length": len(text_to_summarize), |
|
|
"summary_length": len(summary), |
|
|
"style": style |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'summarize_content' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def generate_tags( |
|
|
content: Optional[str] = None, |
|
|
document_id: Optional[str] = None, |
|
|
max_tags: int = 5 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate relevant tags for content or a document_id. |
|
|
Saves tags to document metadata if document_id is provided. |
|
|
""" |
|
|
logger.info(f"Tool 'generate_tags' called. doc_id: {document_id}, max_tags: {max_tags}, has_content: {content is not None}") |
|
|
try: |
|
|
text_for_tags = content |
|
|
if document_id and not text_for_tags: |
|
|
doc = await document_store_service.get_document(document_id) |
|
|
if not doc: |
|
|
return {"success": False, "error": f"Document {document_id} not found"} |
|
|
text_for_tags = doc.content |
|
|
if not text_for_tags: |
|
|
return {"success": False, "error": "No content provided for tag generation"} |
|
|
tags = await generative_tool_instance.generate_tags(text_for_tags, max_tags) |
|
|
if document_id and tags: |
|
|
await document_store_service.update_document_metadata(document_id, {"tags": tags}) |
|
|
logger.info(f"Tags {tags} saved for document {document_id}") |
|
|
return { |
|
|
"success": True, |
|
|
"tags": tags, |
|
|
"content_length": len(text_for_tags), |
|
|
"document_id": document_id |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'generate_tags' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def answer_question(question: str, context_filter: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Answer questions using RAG (Retrieval Augmented Generation) over indexed content. |
|
|
'context_filter' can be used to narrow down the context search. |
|
|
""" |
|
|
logger.info(f"Tool 'answer_question' called with question: {question}, context_filter: {context_filter}") |
|
|
try: |
|
|
search_results = await search_tool_instance.search(question, top_k=5, filters=context_filter) |
|
|
if not search_results: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "No relevant context found. Please upload relevant documents.", |
|
|
"question": question, |
|
|
"answer": "I could not find enough information in the documents to answer your question." |
|
|
} |
|
|
answer = await generative_tool_instance.answer_question(question, search_results) |
|
|
return { |
|
|
"success": True, |
|
|
"question": question, |
|
|
"answer": answer, |
|
|
"sources": [result.to_dict() for result in search_results], |
|
|
"confidence": "high" if len(search_results) >= 3 else "medium" |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'answer_question' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def voice_qa(question: str, session_id: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Ask a question using the AI voice assistant with RAG capabilities. |
|
|
Provides text-based Q&A powered by LlamaIndex agentic search. |
|
|
""" |
|
|
logger.info(f"Tool 'voice_qa' called with question: {question}") |
|
|
try: |
|
|
result = await voice_tool_instance.voice_qa(question, session_id) |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'voice_qa' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def generate_podcast( |
|
|
document_ids: List[str], |
|
|
style: str = "conversational", |
|
|
duration_minutes: int = 10, |
|
|
host1_voice: str = "Rachel", |
|
|
host2_voice: str = "Adam" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a podcast from selected documents. |
|
|
Styles: conversational, educational, technical, casual. |
|
|
Duration: 5-30 minutes recommended. |
|
|
Voices: Rachel, Adam, Domi, Bella, Antoni, Josh, Sam, Emily, etc. |
|
|
""" |
|
|
logger.info(f"Tool 'generate_podcast' called with {len(document_ids)} docs, style: {style}") |
|
|
try: |
|
|
result = await podcast_tool_instance.generate_podcast( |
|
|
document_ids=document_ids, |
|
|
style=style, |
|
|
duration_minutes=duration_minutes, |
|
|
host2_voice=host2_voice |
|
|
) |
|
|
return result |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'generate_podcast' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def generate_podcast_transcript( |
|
|
document_ids: List[str], |
|
|
style: str = "conversational", |
|
|
duration_minutes: int = 10 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a podcast script/transcript WITHOUT generating audio. |
|
|
Useful for previewing content before spending credits on audio generation. |
|
|
""" |
|
|
logger.info(f"Tool 'generate_podcast_transcript' called") |
|
|
try: |
|
|
return await podcast_tool_instance.generate_transcript( |
|
|
document_ids=document_ids, |
|
|
style=style, |
|
|
duration_minutes=duration_minutes |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'generate_podcast_transcript': {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def list_podcasts(limit: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
List previously generated podcasts with their metadata. |
|
|
""" |
|
|
logger.info(f"Tool 'list_podcasts' called") |
|
|
try: |
|
|
return podcast_tool_instance.list_podcasts(limit) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'list_podcasts': {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def get_podcast(podcast_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get metadata for a specific podcast. |
|
|
""" |
|
|
logger.info(f"Tool 'get_podcast' called for {podcast_id}") |
|
|
try: |
|
|
return podcast_tool_instance.get_podcast(podcast_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'get_podcast': {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def get_podcast_audio(podcast_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get the audio file path for a generated podcast. |
|
|
""" |
|
|
logger.info(f"Tool 'get_podcast_audio' called for {podcast_id}") |
|
|
try: |
|
|
return podcast_tool_instance.get_podcast_audio(podcast_id) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'get_podcast_audio': {str(e)}") |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@mcp.tool() |
|
|
async def list_documents_for_ui(limit: int = 100, offset: int = 0) -> Dict[str, Any]: |
|
|
""" |
|
|
(UI Helper) List documents from the document store. |
|
|
Not a standard processing tool, but useful for UI population. |
|
|
""" |
|
|
logger.info(f"Tool 'list_documents_for_ui' called with limit: {limit}, offset: {offset}") |
|
|
try: |
|
|
documents = await document_store_service.list_documents(limit, offset) |
|
|
return { |
|
|
"success": True, |
|
|
"documents": [doc.to_dict() for doc in documents], |
|
|
"total": len(documents) |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in 'list_documents_for_ui' tool: {str(e)}", exc_info=True) |
|
|
return {"success": False, "error": str(e), "documents": []} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|