docling-processor / docstrange /processors /excel_processor.py
arjunbhargav212's picture
Upload 63 files
5b14aa2 verified
"""Excel file processor."""
import os
import logging
from typing import Dict, Any
from .base import BaseProcessor
from ..result import ConversionResult
from ..exceptions import ConversionError, FileNotFoundError
# Configure logging
logger = logging.getLogger(__name__)
class ExcelProcessor(BaseProcessor):
"""Processor for Excel files (XLSX, XLS) and CSV files."""
def can_process(self, file_path: str) -> bool:
"""Check if this processor can handle the given file.
Args:
file_path: Path to the file to check
Returns:
True if this processor can handle the file
"""
if not os.path.exists(file_path):
return False
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
return ext in ['.xlsx', '.xls', '.csv']
def process(self, file_path: str) -> ConversionResult:
"""Process the Excel file and return a conversion result.
Args:
file_path: Path to the Excel file to process
Returns:
ConversionResult containing the processed content
Raises:
FileNotFoundError: If the file doesn't exist
ConversionError: If processing fails
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Check file extension - ensure file_path is a string
file_path_str = str(file_path)
_, ext = os.path.splitext(file_path_str.lower())
if ext == '.csv':
return self._process_csv(file_path)
else:
return self._process_excel(file_path)
def _process_csv(self, file_path: str) -> ConversionResult:
"""Process a CSV file and return a conversion result.
Args:
file_path: Path to the CSV file to process
Returns:
ConversionResult containing the processed content
"""
try:
import pandas as pd
df = pd.read_csv(file_path)
content_parts = []
content_parts.append(f"# CSV Data: {os.path.basename(file_path)}")
content_parts.append("")
# Convert DataFrame to markdown table
table_md = self._dataframe_to_markdown(df, pd)
content_parts.append(table_md)
metadata = {
"row_count": len(df),
"column_count": len(df.columns),
"columns": df.columns.tolist(),
"extractor": "pandas"
}
content = '\n'.join(content_parts)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pandas is required for CSV processing. Install it with: pip install pandas")
except Exception as e:
raise ConversionError(f"Failed to process CSV file {file_path}: {str(e)}")
def _process_excel(self, file_path: str) -> ConversionResult:
"""Process an Excel file and return a conversion result.
Args:
file_path: Path to the Excel file to process
Returns:
ConversionResult containing the processed content
"""
try:
import pandas as pd
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
metadata = {
"sheet_count": len(sheet_names),
"sheet_names": sheet_names,
"extractor": "pandas"
}
content_parts = []
for sheet_name in sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
if not df.empty:
content_parts.append(f"\n## Sheet: {sheet_name}")
content_parts.append("")
# Convert DataFrame to markdown table
table_md = self._dataframe_to_markdown(df, pd)
content_parts.append(table_md)
content_parts.append("")
# Add metadata for this sheet
metadata.update({
f"sheet_{sheet_name}_rows": len(df),
f"sheet_{sheet_name}_columns": len(df.columns),
f"sheet_{sheet_name}_columns_list": df.columns.tolist()
})
content = '\n'.join(content_parts)
return ConversionResult(content, metadata)
except ImportError:
raise ConversionError("pandas and openpyxl are required for Excel processing. Install them with: pip install pandas openpyxl")
except Exception as e:
if isinstance(e, (FileNotFoundError, ConversionError)):
raise
raise ConversionError(f"Failed to process Excel file {file_path}: {str(e)}")
def _dataframe_to_markdown(self, df, pd) -> str:
"""Convert pandas DataFrame to markdown table.
Args:
df: pandas DataFrame
pd: pandas module reference
Returns:
Markdown table string
"""
if df.empty:
return "*No data available*"
# Convert DataFrame to markdown table
markdown_parts = []
# Header
markdown_parts.append("| " + " | ".join(str(col) for col in df.columns) + " |")
markdown_parts.append("| " + " | ".join(["---"] * len(df.columns)) + " |")
# Data rows
for _, row in df.iterrows():
row_data = []
for cell in row:
if pd.isna(cell):
row_data.append("")
else:
row_data.append(str(cell))
markdown_parts.append("| " + " | ".join(row_data) + " |")
return "\n".join(markdown_parts)
def _clean_content(self, content: str) -> str:
"""Clean up the extracted Excel content.
Args:
content: Raw Excel text content
Returns:
Cleaned text content
"""
# Remove excessive whitespace and normalize
lines = content.split('\n')
cleaned_lines = []
for line in lines:
# Remove excessive whitespace
line = ' '.join(line.split())
if line.strip():
cleaned_lines.append(line)
# Join lines and add proper spacing
content = '\n'.join(cleaned_lines)
# Add spacing around headers
content = content.replace('# ', '\n# ')
content = content.replace('## ', '\n## ')
return content.strip()