Spaces:
Sleeping
Sleeping
""" | |
Document MCP Server | |
This module provides MCP server functionality for document processing and analysis. | |
It handles various document formats including: | |
- Text files | |
- PDF documents | |
- Word documents (DOCX) | |
- Excel spreadsheets | |
- PowerPoint presentations | |
- JSON and XML files | |
- Source code files | |
Each document type has specialized processing functions that extract content, | |
structure, and metadata. The server focuses on local file processing with | |
appropriate validation and error handling. | |
Main functions: | |
- mcpreadtext: Reads plain text files | |
- mcpreadpdf: Reads PDF files with optional image extraction | |
- mcpreaddocx: Reads Word documents | |
- mcpreadexcel: Reads Excel spreadsheets | |
- mcpreadpptx: Reads PowerPoint presentations | |
- mcpreadjson: Reads and parses JSON/JSONL files | |
- mcpreadxml: Reads and parses XML files | |
- mcpreadsourcecode: Reads and analyzes source code files | |
""" | |
import io | |
import json | |
import os | |
import sys | |
import tempfile | |
import traceback | |
from datetime import date, datetime | |
from typing import Any, Dict, List, Optional | |
import fitz | |
import html2text | |
import pandas as pd | |
import xmltodict | |
from bs4 import BeautifulSoup | |
from docx2markdown._docx_to_markdown import docx_to_markdown | |
from dotenv import load_dotenv | |
from mcp.server.fastmcp import FastMCP | |
from PIL import Image, ImageDraw, ImageFont | |
from pptx import Presentation | |
from pydantic import BaseModel, Field | |
from PyPDF2 import PdfReader | |
from tabulate import tabulate | |
from xls2xlsx import XLS2XLSX | |
from aworld.logs.util import logger | |
from aworld.utils import import_package | |
from mcp_servers.image_server import encode_images | |
mcp = FastMCP("document-server") | |
# Define model classes for different document types | |
class TextDocument(BaseModel): | |
"""Model representing a text document""" | |
content: str | |
file_path: str | |
file_name: str | |
file_size: int | |
last_modified: str | |
class HtmlDocument(BaseModel): | |
"""Model representing an HTML document""" | |
content: str # Extracted text content | |
html_content: str # Original HTML content | |
file_path: str | |
file_name: str | |
file_size: int | |
last_modified: str | |
title: Optional[str] = None | |
links: Optional[List[Dict[str, str]]] = None | |
images: Optional[List[Dict[str, str]]] = None | |
tables: Optional[List[str]] = None | |
markdown: Optional[str] = None # HTML converted to Markdown format | |
class JsonDocument(BaseModel): | |
"""Model representing a JSON document""" | |
format: str # "json" or "jsonl" | |
type: Optional[str] = None # "array" or "object" for standard JSON | |
count: Optional[int] = None | |
keys: Optional[List[str]] = None | |
data: Any | |
file_path: str | |
file_name: str | |
class XmlDocument(BaseModel): | |
"""Model representing an XML document""" | |
content: Dict | |
file_path: str | |
file_name: str | |
class PdfImage(BaseModel): | |
"""Model representing an image extracted from a PDF""" | |
page: int | |
format: str | |
width: int | |
height: int | |
path: str | |
class PdfDocument(BaseModel): | |
"""Model representing a PDF document""" | |
content: str | |
file_path: str | |
file_name: str | |
page_count: int | |
images: Optional[List[PdfImage]] = None | |
image_count: Optional[int] = None | |
image_dir: Optional[str] = None | |
error: Optional[str] = None | |
class PdfResult(BaseModel): | |
"""Model representing results from processing multiple PDF documents""" | |
total_files: int | |
success_count: int | |
failed_count: int | |
results: List[PdfDocument] | |
class DocxDocument(BaseModel): | |
"""Model representing a Word document""" | |
content: str | |
file_path: str | |
file_name: str | |
class ExcelSheet(BaseModel): | |
"""Model representing a sheet in an Excel file""" | |
name: str | |
data: List[Dict[str, Any]] | |
markdown_table: str | |
row_count: int | |
column_count: int | |
class ExcelDocument(BaseModel): | |
"""Model representing an Excel document""" | |
file_name: str | |
file_path: str | |
processed_path: Optional[str] = None | |
file_type: str | |
sheet_count: int | |
sheet_names: List[str] | |
sheets: List[ExcelSheet] | |
success: bool = True | |
error: Optional[str] = None | |
class ExcelResult(BaseModel): | |
"""Model representing results from processing multiple Excel documents""" | |
total_files: int | |
success_count: int | |
failed_count: int | |
results: List[ExcelDocument] | |
class PowerPointSlide(BaseModel): | |
"""Model representing a slide in a PowerPoint presentation""" | |
slide_number: int | |
image: str # Base64 encoded image | |
class PowerPointDocument(BaseModel): | |
"""Model representing a PowerPoint document""" | |
file_path: str | |
file_name: str | |
slide_count: int | |
slides: List[PowerPointSlide] | |
class SourceCodeDocument(BaseModel): | |
"""Model representing a source code document""" | |
content: str | |
file_type: str | |
file_path: str | |
file_name: str | |
line_count: int | |
size_bytes: int | |
last_modified: str | |
classes: Optional[List[str]] = None | |
functions: Optional[List[str]] = None | |
imports: Optional[List[str]] = None | |
package: Optional[List[str]] = None | |
methods: Optional[List[str]] = None | |
includes: Optional[List[str]] = None | |
class DocumentError(BaseModel): | |
"""Model representing an error in document processing""" | |
error: str | |
file_path: Optional[str] = None | |
file_name: Optional[str] = None | |
class ComplexEncoder(json.JSONEncoder): | |
def default(self, o): | |
if isinstance(o, datetime): | |
return o.strftime("%Y-%m-%d %H:%M:%S") | |
elif isinstance(o, date): | |
return o.strftime("%Y-%m-%d") | |
else: | |
return json.JSONEncoder.default(self, o) | |
def handle_error(e: Exception, error_type: str, file_path: Optional[str] = None) -> str: | |
"""Unified error handling and return standard format error message""" | |
error_msg = f"{error_type} error: {str(e)}" | |
logger.error(traceback.format_exc()) | |
error = DocumentError( | |
error=error_msg, | |
file_path=file_path, | |
file_name=os.path.basename(file_path) if file_path else None, | |
) | |
return error.model_dump_json() | |
def check_file_readable(document_path: str) -> str: | |
"""Check if file exists and is readable, return error message or None""" | |
if not os.path.exists(document_path): | |
return f"File does not exist: {document_path}" | |
if not os.access(document_path, os.R_OK): | |
return f"File is not readable: {document_path}" | |
return None | |
def mcpreadtext( | |
document_path: str = Field(description="The input local text file path."), | |
) -> str: | |
"""Read and return content from local text file. Cannot process https://URLs files.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
try: | |
with open(document_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
result = TextDocument( | |
content=content, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
file_size=os.path.getsize(document_path), | |
last_modified=datetime.fromtimestamp( | |
os.path.getmtime(document_path) | |
).strftime("%Y-%m-%d %H:%M:%S"), | |
) | |
return result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "Text file reading", document_path) | |
def mcpreadjson( | |
document_path: str = Field(description="Local path to JSON or JSONL file"), | |
is_jsonl: bool = Field( | |
default=False, | |
description="Whether the file is in JSONL format (one JSON object per line)", | |
), | |
) -> str: | |
"""Read and parse JSON or JSONL file, return the parsed content. Cannot process https://URLs files.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
try: | |
# Choose processing method based on file type | |
if is_jsonl: | |
# Process JSONL file (one JSON object per line) | |
results = [] | |
with open(document_path, "r", encoding="utf-8") as f: | |
for line_num, line in enumerate(f, 1): | |
line = line.strip() | |
if not line: | |
continue | |
try: | |
json_obj = json.loads(line) | |
results.append(json_obj) | |
except json.JSONDecodeError as e: | |
logger.warning( | |
f"JSON parsing error at line {line_num}: {str(e)}" | |
) | |
# Create result model | |
result = JsonDocument( | |
format="jsonl", | |
count=len(results), | |
data=results, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
) | |
else: | |
# Process standard JSON file | |
with open(document_path, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
# Create result model based on data type | |
if isinstance(data, list): | |
result = JsonDocument( | |
format="json", | |
type="array", | |
count=len(data), | |
data=data, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
) | |
else: | |
result = JsonDocument( | |
format="json", | |
type="object", | |
keys=list(data.keys()) if isinstance(data, dict) else [], | |
data=data, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
) | |
return result.model_dump_json() | |
except json.JSONDecodeError as e: | |
return handle_error(e, "JSON parsing", document_path) | |
except Exception as e: | |
return handle_error(e, "JSON file reading", document_path) | |
def mcpreadxml( | |
document_path: str = Field(description="The local input XML file path."), | |
) -> str: | |
"""Read and return content from XML file. Cannot process https://URLs files.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
try: | |
with open(document_path, "r", encoding="utf-8") as f: | |
data = f.read() | |
result = XmlDocument( | |
content=xmltodict.parse(data), | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
) | |
return result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "XML file reading", document_path) | |
def mcpreadpdf( | |
document_paths: List[str] = Field(description="The local input PDF file paths."), | |
extract_images: bool = Field( | |
default=False, description="Whether to extract images from PDF (default: False)" | |
), | |
) -> str: | |
"""Read and return content from PDF file with optional image extraction. Cannot process https://URLs files.""" | |
try: | |
results = [] | |
success_count = 0 | |
failed_count = 0 | |
for document_path in document_paths: | |
error = check_file_readable(document_path) | |
if error: | |
results.append( | |
PdfDocument( | |
content="", | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
page_count=0, | |
error=error, | |
) | |
) | |
failed_count += 1 | |
continue | |
try: | |
with open(document_path, "rb") as f: | |
reader = PdfReader(f) | |
content = " ".join(page.extract_text() for page in reader.pages) | |
page_count = len(reader.pages) | |
pdf_result = PdfDocument( | |
content=content, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
page_count=page_count, | |
) | |
# Extract images if requested | |
if extract_images: | |
images_data = [] | |
# Use /tmp directory for storing images | |
output_dir = "/tmp/pdf_images" | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Generate a unique subfolder based on filename to avoid conflicts | |
pdf_name = os.path.splitext(os.path.basename(document_path))[0] | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
image_dir = os.path.join(output_dir, f"{pdf_name}_{timestamp}") | |
os.makedirs(image_dir, exist_ok=True) | |
try: | |
# Open PDF with PyMuPDF | |
pdf_document = fitz.open(document_path) | |
# Iterate through each page | |
for page_index in range(len(pdf_document)): | |
page = pdf_document[page_index] | |
# Get image list | |
image_list = page.get_images(full=True) | |
# Process each image | |
for img_index, img in enumerate(image_list): | |
# Extract image information | |
xref = img[0] | |
base_image = pdf_document.extract_image(xref) | |
image_bytes = base_image["image"] | |
image_ext = base_image["ext"] | |
# Save image to file in /tmp directory | |
img_filename = f"pdf_image_p{page_index+1}_{img_index+1}.{image_ext}" | |
img_path = os.path.join(image_dir, img_filename) | |
with open(img_path, "wb") as img_file: | |
img_file.write(image_bytes) | |
logger.success(f"Image saved: {img_path}") | |
# Get image dimensions | |
with Image.open(img_path) as img: | |
width, height = img.size | |
# Add to results with file path instead of base64 | |
images_data.append( | |
PdfImage( | |
page=page_index + 1, | |
format=image_ext, | |
width=width, | |
height=height, | |
path=img_path, | |
) | |
) | |
pdf_result.images = images_data | |
pdf_result.image_count = len(images_data) | |
pdf_result.image_dir = image_dir | |
except Exception as img_error: | |
logger.error(f"Error extracting images: {str(img_error)}") | |
# Don't clean up on error so we can keep any successfully extracted images | |
pdf_result.error = str(img_error) | |
results.append(pdf_result) | |
success_count += 1 | |
except Exception as e: | |
results.append( | |
PdfDocument( | |
content="", | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
page_count=0, | |
error=str(e), | |
) | |
) | |
failed_count += 1 | |
# Create final result | |
pdf_result = PdfResult( | |
total_files=len(document_paths), | |
success_count=success_count, | |
failed_count=failed_count, | |
results=results, | |
) | |
return pdf_result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "PDF file reading") | |
def mcpreaddocx( | |
document_path: str = Field(description="The local input Word file path."), | |
) -> str: | |
"""Read and return content from Word file. Cannot process https://URLs files.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
try: | |
file_name = os.path.basename(document_path) | |
md_file_path = f"{file_name}.md" | |
docx_to_markdown(document_path, md_file_path) | |
with open(md_file_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
os.remove(md_file_path) | |
result = DocxDocument( | |
content=content, file_path=document_path, file_name=file_name | |
) | |
return result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "Word file reading", document_path) | |
def mcpreadexcel( | |
document_paths: List[str] = Field( | |
description="List of local input Excel/CSV file paths." | |
), | |
max_rows: int = Field( | |
1000, description="Maximum number of rows to read per sheet (default: 1000)" | |
), | |
convert_xls_to_xlsx: bool = Field( | |
False, | |
description="Whether to convert XLS files to XLSX format (default: False)", | |
), | |
) -> str: | |
"""Read multiple Excel/CSV files and convert sheets to Markdown tables. Cannot process https://URLs files.""" | |
try: | |
# Import required packages | |
import_package("tabulate") | |
# Import xls2xlsx package if conversion is requested | |
if convert_xls_to_xlsx: | |
import_package("xls2xlsx") | |
all_results = [] | |
temp_files = [] # Track temporary files for cleanup | |
success_count = 0 | |
failed_count = 0 | |
# Process each file | |
for document_path in document_paths: | |
# Check if file exists and is readable | |
error = check_file_readable(document_path) | |
if error: | |
all_results.append( | |
ExcelDocument( | |
file_name=os.path.basename(document_path), | |
file_path=document_path, | |
file_type="UNKNOWN", | |
sheet_count=0, | |
sheet_names=[], | |
sheets=[], | |
success=False, | |
error=error, | |
) | |
) | |
failed_count += 1 | |
continue | |
try: | |
# Check file extension | |
file_ext = os.path.splitext(document_path)[1].lower() | |
# Validate file type | |
if file_ext not in [".csv", ".xls", ".xlsx", ".xlsm"]: | |
error_msg = f"Unsupported file format: {file_ext}. Only CSV, XLS, XLSX, and XLSM formats are supported." | |
all_results.append( | |
ExcelDocument( | |
file_name=os.path.basename(document_path), | |
file_path=document_path, | |
file_type=file_ext.replace(".", "").upper(), | |
sheet_count=0, | |
sheet_names=[], | |
sheets=[], | |
success=False, | |
error=error_msg, | |
) | |
) | |
failed_count += 1 | |
continue | |
# Convert XLS to XLSX if requested and file is XLS | |
processed_path = document_path | |
if convert_xls_to_xlsx and file_ext == ".xls": | |
try: | |
logger.info(f"Converting XLS to XLSX: {document_path}") | |
converter = XLS2XLSX(document_path) | |
# Create temp file with xlsx extension | |
xlsx_path = ( | |
os.path.splitext(document_path)[0] + "_converted.xlsx" | |
) | |
converter.to_xlsx(xlsx_path) | |
processed_path = xlsx_path | |
temp_files.append(xlsx_path) # Track for cleanup | |
logger.success(f"Converted XLS to XLSX: {xlsx_path}") | |
except Exception as conv_error: | |
logger.error(f"XLS to XLSX conversion error: {str(conv_error)}") | |
# Continue with original file if conversion fails | |
excel_sheets = [] | |
sheet_names = [] | |
# Handle CSV files differently | |
if file_ext == ".csv": | |
# For CSV files, create a single sheet with the file name | |
sheet_name = os.path.basename(document_path).replace(".csv", "") | |
df = pd.read_csv(processed_path, nrows=max_rows) | |
# Create markdown table | |
markdown_table = "*Empty table*" | |
if not df.empty: | |
headers = df.columns.tolist() | |
table_data = df.values.tolist() | |
markdown_table = tabulate( | |
table_data, headers=headers, tablefmt="pipe" | |
) | |
if len(df) >= max_rows: | |
markdown_table += ( | |
f"\n\n*Note: Table truncated to {max_rows} rows*" | |
) | |
# Create sheet model | |
excel_sheets.append( | |
ExcelSheet( | |
name=sheet_name, | |
data=df.to_dict(orient="records"), | |
markdown_table=markdown_table, | |
row_count=len(df), | |
column_count=len(df.columns), | |
) | |
) | |
sheet_names = [sheet_name] | |
else: | |
# For Excel files, process all sheets | |
with pd.ExcelFile(processed_path) as xls: | |
sheet_names = xls.sheet_names | |
for sheet_name in sheet_names: | |
# Read Excel sheet into DataFrame with row limit | |
df = pd.read_excel( | |
xls, sheet_name=sheet_name, nrows=max_rows | |
) | |
# Create markdown table | |
markdown_table = "*Empty table*" | |
if not df.empty: | |
headers = df.columns.tolist() | |
table_data = df.values.tolist() | |
markdown_table = tabulate( | |
table_data, headers=headers, tablefmt="pipe" | |
) | |
if len(df) >= max_rows: | |
markdown_table += f"\n\n*Note: Table truncated to {max_rows} rows*" | |
# Create sheet model | |
excel_sheets.append( | |
ExcelSheet( | |
name=sheet_name, | |
data=df.to_dict(orient="records"), | |
markdown_table=markdown_table, | |
row_count=len(df), | |
column_count=len(df.columns), | |
) | |
) | |
# Create result for this file | |
file_result = ExcelDocument( | |
file_name=os.path.basename(document_path), | |
file_path=document_path, | |
processed_path=( | |
processed_path if processed_path != document_path else None | |
), | |
file_type=file_ext.replace(".", "").upper(), | |
sheet_count=len(sheet_names), | |
sheet_names=sheet_names, | |
sheets=excel_sheets, | |
success=True, | |
) | |
all_results.append(file_result) | |
success_count += 1 | |
except Exception as file_error: | |
# Handle errors for individual files | |
error_msg = str(file_error) | |
logger.error(f"File reading error for {document_path}: {error_msg}") | |
all_results.append( | |
ExcelDocument( | |
file_name=os.path.basename(document_path), | |
file_path=document_path, | |
file_type=os.path.splitext(document_path)[1] | |
.replace(".", "") | |
.upper(), | |
sheet_count=0, | |
sheet_names=[], | |
sheets=[], | |
success=False, | |
error=error_msg, | |
) | |
) | |
failed_count += 1 | |
# Clean up temporary files | |
for temp_file in temp_files: | |
try: | |
if os.path.exists(temp_file): | |
os.remove(temp_file) | |
logger.info(f"Removed temporary file: {temp_file}") | |
except Exception as cleanup_error: | |
logger.warning( | |
f"Error cleaning up temporary file {temp_file}: {str(cleanup_error)}" | |
) | |
# Create final result | |
excel_result = ExcelResult( | |
total_files=len(document_paths), | |
success_count=success_count, | |
failed_count=failed_count, | |
results=all_results, | |
) | |
return excel_result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "Excel/CSV files processing") | |
def mcpreadpptx( | |
document_path: str = Field(description="The local input PowerPoint file path."), | |
) -> str: | |
"""Read and convert PowerPoint slides to base64 encoded images. Cannot process https://URLs files.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
# Create temporary directory | |
temp_dir = tempfile.mkdtemp() | |
slides_data = [] | |
try: | |
presentation = Presentation(document_path) | |
total_slides = len(presentation.slides) | |
if total_slides == 0: | |
raise ValueError("PPT file does not contain any slides") | |
# Process each slide | |
for i, slide in enumerate(presentation.slides): | |
# Set slide dimensions | |
slide_width_px = 1920 # 16:9 ratio | |
slide_height_px = 1080 | |
# Create blank image | |
slide_img = Image.new("RGB", (slide_width_px, slide_height_px), "white") | |
draw = ImageDraw.Draw(slide_img) | |
font = ImageFont.load_default() | |
# Draw slide number | |
draw.text((20, 20), f"Slide {i+1}/{total_slides}", fill="black", font=font) | |
# Process shapes in the slide | |
for shape in slide.shapes: | |
try: | |
# Process images | |
if hasattr(shape, "image") and shape.image: | |
image_stream = io.BytesIO(shape.image.blob) | |
img = Image.open(image_stream) | |
left = int( | |
shape.left * slide_width_px / presentation.slide_width | |
) | |
top = int( | |
shape.top * slide_height_px / presentation.slide_height | |
) | |
slide_img.paste(img, (left, top)) | |
# Process text | |
elif hasattr(shape, "text") and shape.text: | |
text_left = int( | |
shape.left * slide_width_px / presentation.slide_width | |
) | |
text_top = int( | |
shape.top * slide_height_px / presentation.slide_height | |
) | |
draw.text( | |
(text_left, text_top), | |
shape.text, | |
fill="black", | |
font=font, | |
) | |
except Exception as shape_error: | |
logger.warning( | |
f"Error processing shape in slide {i+1}: {str(shape_error)}" | |
) | |
# Save slide image | |
img_path = os.path.join(temp_dir, f"slide_{i+1}.jpg") | |
slide_img.save(img_path, "JPEG") | |
# Convert to base64 | |
base64_image = encode_images(img_path) | |
slides_data.append( | |
PowerPointSlide( | |
slide_number=i + 1, image=f"data:image/jpeg;base64,{base64_image}" | |
) | |
) | |
# Create result | |
result = PowerPointDocument( | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
slide_count=total_slides, | |
slides=slides_data, | |
) | |
return result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "PowerPoint processing", document_path) | |
finally: | |
# Clean up temporary files | |
try: | |
for file in os.listdir(temp_dir): | |
os.remove(os.path.join(temp_dir, file)) | |
os.rmdir(temp_dir) | |
except Exception as cleanup_error: | |
logger.warning(f"Error cleaning up temporary files: {str(cleanup_error)}") | |
def mcpreadhtmltext( | |
document_path: str = Field(description="Local HTML file path or Web URL."), | |
extract_links: bool = Field( | |
default=True, description="Whether to extract link information" | |
), | |
extract_images: bool = Field( | |
default=True, description="Whether to extract image information" | |
), | |
extract_tables: bool = Field( | |
default=True, description="Whether to extract table information" | |
), | |
convert_to_markdown: bool = Field( | |
default=True, description="Whether to convert HTML to Markdown format" | |
), | |
) -> str: | |
"""Read HTML file and extract text content, optionally extract links, images, and table information, and convert to Markdown format.""" | |
error = check_file_readable(document_path) | |
if error: | |
return DocumentError(error=error, file_path=document_path).model_dump_json() | |
try: | |
# Read HTML file | |
with open(document_path, "r", encoding="utf-8") as f: | |
html_content = f.read() | |
# Parse HTML using BeautifulSoup | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Extract text content (remove script and style content) | |
for script in soup(["script", "style"]): | |
script.extract() | |
text_content = soup.get_text(separator="\n", strip=True) | |
# Extract title | |
title = soup.title.string if soup.title else None | |
# Initialize result object | |
result = HtmlDocument( | |
content=text_content, | |
html_content=html_content, | |
file_path=document_path, | |
file_name=os.path.basename(document_path), | |
file_size=os.path.getsize(document_path), | |
last_modified=datetime.fromtimestamp( | |
os.path.getmtime(document_path) | |
).strftime("%Y-%m-%d %H:%M:%S"), | |
title=title, | |
) | |
# Extract links | |
if extract_links: | |
links = [] | |
for link in soup.find_all("a"): | |
href = link.get("href") | |
text = link.get_text(strip=True) | |
if href: | |
links.append({"url": href, "text": text}) | |
result.links = links | |
# Extract images | |
if extract_images: | |
images = [] | |
for img in soup.find_all("img"): | |
src = img.get("src") | |
alt = img.get("alt", "") | |
if src: | |
images.append({"src": src, "alt": alt}) | |
result.images = images | |
# Extract tables | |
if extract_tables: | |
tables = [] | |
for table in soup.find_all("table"): | |
tables.append(str(table)) | |
result.tables = tables | |
# Convert to Markdown | |
if convert_to_markdown: | |
h = html2text.HTML2Text() | |
h.ignore_links = False | |
h.ignore_images = False | |
h.ignore_tables = False | |
markdown_content = h.handle(html_content) | |
result.markdown = markdown_content | |
return result.model_dump_json() | |
except Exception as e: | |
return handle_error(e, "HTML file reading", document_path) | |
def main(): | |
load_dotenv() | |
print("Starting Document MCP Server...", file=sys.stderr) | |
mcp.run(transport="stdio") | |
# Make the module callable | |
def __call__(): | |
""" | |
Make the module callable for uvx. | |
This function is called when the module is executed directly. | |
""" | |
main() | |
sys.modules[__name__].__call__ = __call__ | |
# Run the server when the script is executed directly | |
if __name__ == "__main__": | |
main() | |