Spaces:
Running
Running
import streamlit as st | |
import os | |
import io | |
import base64 | |
import logging | |
import re | |
from datetime import datetime | |
from pathlib import Path | |
import json | |
# Define exports | |
__all__ = [ | |
'ProgressReporter', | |
'create_sidebar_options', | |
'create_file_uploader', | |
'display_document_with_images', | |
'display_previous_results', | |
'display_about_tab', | |
'display_results' # Re-export from utils.ui_utils | |
] | |
from constants import ( | |
DOCUMENT_TYPES, | |
DOCUMENT_LAYOUTS, | |
CUSTOM_PROMPT_TEMPLATES, | |
LAYOUT_PROMPT_ADDITIONS, | |
DEFAULT_PDF_DPI, | |
MIN_PDF_DPI, | |
MAX_PDF_DPI, | |
DEFAULT_MAX_PAGES, | |
PERFORMANCE_MODES, | |
PREPROCESSING_DOC_TYPES, | |
ROTATION_OPTIONS | |
) | |
from utils.text_utils import format_ocr_text, clean_raw_text, format_markdown_text # Import from text_utils | |
from utils.content_utils import ( | |
classify_document_content, | |
extract_document_text, | |
extract_image_description | |
) | |
from utils.ui_utils import display_results | |
from preprocessing import preprocess_image | |
class ProgressReporter: | |
"""Class to handle progress reporting in the UI""" | |
def __init__(self, placeholder): | |
self.placeholder = placeholder | |
self.progress_bar = None | |
self.status_text = None | |
def setup(self): | |
"""Setup the progress components""" | |
with self.placeholder.container(): | |
self.progress_bar = st.progress(0) | |
self.status_text = st.empty() | |
return self | |
def update(self, percent, status_text): | |
"""Update the progress bar and status text""" | |
if self.progress_bar is not None: | |
self.progress_bar.progress(percent / 100) | |
if self.status_text is not None: | |
self.status_text.text(status_text) | |
def complete(self, success=True): | |
"""Complete the progress reporting""" | |
if success: | |
if self.progress_bar is not None: | |
self.progress_bar.progress(100) | |
if self.status_text is not None: | |
self.status_text.text("Processing complete!") | |
else: | |
if self.status_text is not None: | |
self.status_text.text("Processing failed.") | |
# Clear the progress components after a delay | |
import time | |
time.sleep(0.8) # Short delay to show completion | |
if self.progress_bar is not None: | |
self.progress_bar.empty() | |
if self.status_text is not None: | |
self.status_text.empty() | |
def create_sidebar_options(): | |
"""Create and return sidebar options""" | |
with st.sidebar: | |
st.markdown("## OCR Settings") | |
# Create a container for the sidebar options | |
with st.container(): | |
# Default to using vision model (removed selection from UI) | |
use_vision = True | |
# Document type selection | |
doc_type = st.selectbox("Document Type", DOCUMENT_TYPES, | |
help="Select the type of document you're processing for better results") | |
# Document layout | |
doc_layout = st.selectbox("Document Layout", DOCUMENT_LAYOUTS, | |
help="Select the layout of your document") | |
# Initialize preprocessing variables with default values | |
grayscale = False | |
denoise = False | |
contrast = 0 | |
rotation = 0 | |
use_segmentation = False | |
# Custom prompt | |
custom_prompt = "" | |
# Get the template for the selected document type if not auto-detect | |
if doc_type != DOCUMENT_TYPES[0]: | |
prompt_template = CUSTOM_PROMPT_TEMPLATES.get(doc_type, "") | |
# Add layout information if not standard | |
if doc_layout != DOCUMENT_LAYOUTS[0]: # Not standard layout | |
layout_addition = LAYOUT_PROMPT_ADDITIONS.get(doc_layout, "") | |
if layout_addition: | |
prompt_template += " " + layout_addition | |
# Set the custom prompt | |
custom_prompt = prompt_template | |
# Allow user to edit the prompt (always visible) | |
custom_prompt = st.text_area("Custom Processing Instructions", value=custom_prompt, | |
help="Customize the instructions for processing this document", | |
height=80) | |
# Image preprocessing options (always visible) | |
st.markdown("### Image Preprocessing") | |
# Grayscale conversion | |
grayscale = st.checkbox("Convert to Grayscale", | |
value=True, | |
help="Convert color images to grayscale for better text recognition") | |
# Light denoising option | |
denoise = st.checkbox("Light Denoising", | |
value=True, | |
help="Apply gentle denoising to improve text clarity") | |
# Contrast adjustment | |
contrast = st.slider("Contrast Adjustment", | |
min_value=-20, | |
max_value=20, | |
value=5, | |
step=5, | |
help="Adjust image contrast (limited range)") | |
# Initialize rotation (keeping it set to 0) | |
rotation = 0 | |
use_segmentation = False | |
# Create preprocessing options dictionary | |
# Map UI document types to preprocessing document types | |
doc_type_for_preprocessing = "standard" | |
if "Handwritten" in doc_type: | |
doc_type_for_preprocessing = "handwritten" | |
elif "Newspaper" in doc_type or "Magazine" in doc_type: | |
doc_type_for_preprocessing = "newspaper" | |
elif "Book" in doc_type or "Publication" in doc_type: | |
doc_type_for_preprocessing = "book" # Match the actual preprocessing type | |
preprocessing_options = { | |
"document_type": doc_type_for_preprocessing, | |
"grayscale": grayscale, | |
"denoise": denoise, | |
"contrast": contrast, | |
"rotation": rotation | |
} | |
# PDF-specific options | |
st.markdown("### PDF Options") | |
max_pages = st.number_input("Maximum Pages to Process", | |
min_value=1, | |
max_value=20, | |
value=DEFAULT_MAX_PAGES, | |
help="Limit the number of pages to process (for multi-page PDFs)") | |
# Set default values for removed options | |
pdf_dpi = DEFAULT_PDF_DPI | |
pdf_rotation = 0 | |
# Create options dictionary | |
options = { | |
"use_vision": use_vision, | |
"perf_mode": "Quality", # Default to Quality, removed performance mode option | |
"pdf_dpi": pdf_dpi, | |
"max_pages": max_pages, | |
"pdf_rotation": pdf_rotation, | |
"custom_prompt": custom_prompt, | |
"preprocessing_options": preprocessing_options, | |
"use_segmentation": use_segmentation if 'use_segmentation' in locals() else False | |
} | |
return options | |
def create_file_uploader(): | |
"""Create and return a file uploader""" | |
# Add app description | |
st.markdown(f'<div style="display: flex; align-items: center; gap: 10px;"><div style="font-size: 32px;">📜</div><div><h2 style="margin: 0; padding: 10px 0 0 0;">Historical OCR</h2></div></div>', unsafe_allow_html=True) | |
st.markdown("<p style='font-size: 0.8em; color: #666; text-align: left;'>Made possible by Mistral AI</p>", unsafe_allow_html=True) | |
# Add project framing | |
st.markdown(""" | |
This tool assists scholars in historical research by extracting text from challenging documents. While it may not achieve 100% accuracy, it helps navigate: | |
- **Historical newspapers** with complex layouts | |
- **Handwritten documents** from various periods | |
- **Photos of archival materials** | |
Upload a document to begin, or explore the examples. | |
""") | |
# Create file uploader with a more concise label | |
uploaded_file = st.file_uploader( | |
"Select file", | |
type=["pdf", "png", "jpg"], | |
help="Upload a PDF or image file for OCR processing" | |
) | |
return uploaded_file | |
def display_document_with_images(result): | |
"""Display document with images""" | |
# Check for pages_data first | |
if 'pages_data' in result and result['pages_data']: | |
pages_data = result['pages_data'] | |
# If pages_data not available, try to extract from raw_response_data | |
elif 'raw_response_data' in result and isinstance(result['raw_response_data'], dict) and 'pages' in result['raw_response_data']: | |
# Build pages_data from raw_response_data | |
pages_data = [] | |
raw_pages = result['raw_response_data']['pages'] | |
for page_idx, page in enumerate(raw_pages): | |
if not isinstance(page, dict): | |
continue | |
page_data = { | |
'page_number': page_idx + 1, | |
'markdown': page.get('markdown', ''), | |
'images': [] | |
} | |
# Extract images if present | |
if 'images' in page and isinstance(page['images'], list): | |
for img_idx, img in enumerate(page['images']): | |
if isinstance(img, dict) and ('base64' in img or 'image_base64' in img): | |
img_base64 = img.get('image_base64', img.get('base64', '')) | |
if img_base64: | |
page_data['images'].append({ | |
'id': img.get('id', f"img_{page_idx}_{img_idx}"), | |
'image_base64': img_base64 | |
}) | |
if page_data['markdown'] or page_data['images']: | |
pages_data.append(page_data) | |
else: | |
st.info("No image data available.") | |
return | |
# Display each page | |
for i, page_data in enumerate(pages_data): | |
st.markdown(f"### Page {i+1}") | |
# Display only the image (removed text column) | |
# Display the image - check multiple possible field names | |
image_displayed = False | |
# Try 'image_data' field first | |
if 'image_data' in page_data: | |
try: | |
# Convert base64 to image | |
image_data = base64.b64decode(page_data['image_data']) | |
st.image(io.BytesIO(image_data), use_container_width=True) | |
image_displayed = True | |
except Exception as e: | |
st.error(f"Error displaying image from image_data: {str(e)}") | |
# Try 'images' array if image_data didn't work | |
if not image_displayed and 'images' in page_data and len(page_data['images']) > 0: | |
for img in page_data['images']: | |
if 'image_base64' in img: | |
try: | |
st.image(img['image_base64'], use_container_width=True) | |
image_displayed = True | |
break | |
except Exception as e: | |
st.error(f"Error displaying image from images array: {str(e)}") | |
# Try alternative image source if still not displayed | |
if not image_displayed and 'raw_response_data' in result: | |
raw_data = result['raw_response_data'] | |
if isinstance(raw_data, dict) and 'pages' in raw_data: | |
for raw_page in raw_data['pages']: | |
if isinstance(raw_page, dict) and 'images' in raw_page: | |
for img in raw_page['images']: | |
if isinstance(img, dict) and 'base64' in img: | |
st.image(img['base64'], use_container_width=True) | |
st.caption("Image from OCR response") | |
image_displayed = True | |
break | |
if image_displayed: | |
break | |
if not image_displayed: | |
st.info("No image available for this page.") | |
# Extract and display alt text if available | |
page_text = "" | |
if 'text' in page_data: | |
page_text = page_data['text'] | |
elif 'markdown' in page_data: | |
page_text = page_data['markdown'] | |
if page_text and page_text.startswith("![") and page_text.endswith(")"): | |
try: | |
alt_text = page_text[2:page_text.index(']')] | |
if alt_text and len(alt_text) > 5: # Only show if alt text is meaningful | |
st.caption(f"Image description: {alt_text}") | |
except: | |
pass | |
def display_previous_results(): | |
"""Display previous results tab content in a simplified, structured view""" | |
# Use a simple header without the button column | |
st.header("Previous Results") | |
# Display previous results if available | |
if not st.session_state.previous_results: | |
st.markdown(""" | |
<div style="text-align: center; padding: 30px 20px; background-color: #f8f9fa; border-radius: 6px; margin-top: 10px;"> | |
<div style="font-size: 36px; margin-bottom: 15px;">📄</div> | |
<h3="margin-bottom: 16px; font-weight: 500;">No Previous Results</h3> | |
<p style="font-size: 14px; color: #666;">Process a document to see your results history.</p> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
# Prepare zip download outside of the UI flow | |
try: | |
# Create download button for all results | |
from utils.image_utils import create_results_zip_in_memory | |
zip_data = create_results_zip_in_memory(st.session_state.previous_results) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Simplified filename | |
zip_filename = f"ocr_results_{timestamp}.zip" | |
# Encode the zip data for direct download link | |
zip_b64 = base64.b64encode(zip_data).decode() | |
# Add styled download tag in the metadata section | |
download_html = '<div style="display: flex; align-items: center; margin: 0.5rem 0; flex-wrap: wrap;">' | |
download_html += '<div style="margin-right: 0.3rem; font-weight: bold;">Download:</div>' | |
download_html += f'<a href="data:application/zip;base64,{zip_b64}" download="{zip_filename}" class="subject-tag tag-download">All Results</a>' | |
download_html += '</div>' | |
st.markdown(download_html, unsafe_allow_html=True) | |
except Exception: | |
# Silent fail - no error message to keep UI clean | |
pass | |
# Create a cleaner, more minimal grid for results using Streamlit columns | |
# Calculate number of columns based on screen width - more responsive | |
num_columns = 2 # Two columns for most screens | |
# Create rows of result cards | |
for i in range(0, len(st.session_state.previous_results), num_columns): | |
# Create a row of columns | |
cols = st.columns(num_columns) | |
# Fill each column with a result card | |
for j in range(num_columns): | |
index = i + j | |
if index < len(st.session_state.previous_results): | |
result = st.session_state.previous_results[index] | |
# Get basic info for the card | |
file_name = result.get("file_name", f"Document {index+1}") | |
timestamp = result.get("timestamp", "") | |
# Determine file type icon | |
if file_name.lower().endswith(".pdf"): | |
icon = "📄" | |
elif any(file_name.lower().endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif"]): | |
icon = "🖼️" | |
else: | |
icon = "📝" | |
# Display a simplified card in each column | |
with cols[j]: | |
# Use a container for better styling control | |
with st.container(): | |
# Create visually cleaner card with less vertical space | |
st.markdown(f""" | |
<div style="padding: 10px; border: 1px solid #e0e0e0; border-radius: 6px; margin-bottom: 10px;"> | |
<div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 5px;"> | |
<div style="font-weight: 500; font-size: 14px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;">{icon} {file_name}</div> | |
<div style="color: #666; font-size: 12px;">{timestamp.split()[0] if timestamp else ""}</div> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Add a simple button below each card | |
if st.button(f"View", key=f"view_{index}", help=f"View {file_name}"): | |
st.session_state.selected_previous_result = st.session_state.previous_results[index] | |
st.rerun() | |
# Display the selected result if available | |
if 'selected_previous_result' in st.session_state and st.session_state.selected_previous_result: | |
selected_result = st.session_state.selected_previous_result | |
# Draw a separator between results list and selected document | |
st.markdown("<hr style='margin: 20px 0 15px 0; border: none; height: 1px; background-color: #eee;'>", unsafe_allow_html=True) | |
# Create a cleaner header for the selected document | |
file_name = selected_result.get('file_name', 'Document') | |
st.subheader(f"{file_name}") | |
# Add a simple back button at the top | |
if st.button("← Back to Results", key="back_to_results"): | |
if 'selected_previous_result' in st.session_state: | |
del st.session_state.selected_previous_result | |
st.session_state.perform_reset = True | |
st.rerun() | |
# Simplified metadata display - just one line with essential info | |
meta_html = '<div style="display: flex; flex-wrap: wrap; gap: 12px; margin: 8px 0 15px 0; font-size: 14px; color: #666;">' | |
# Add timestamp | |
if 'timestamp' in selected_result: | |
meta_html += f'<div>{selected_result["timestamp"]}</div>' | |
# Add languages if available (simplified) | |
if 'languages' in selected_result and selected_result['languages']: | |
languages = [lang for lang in selected_result['languages'] if lang is not None] | |
if languages: | |
meta_html += f'<div>Language: {", ".join(languages)}</div>' | |
# Add page count if available (simplified) | |
if 'limited_pages' in selected_result: | |
meta_html += f'<div>Pages: {selected_result["limited_pages"]["processed"]}/{selected_result["limited_pages"]["total"]}</div>' | |
meta_html += '</div>' | |
st.markdown(meta_html, unsafe_allow_html=True) | |
# Simplified tabs - using the same format as main view | |
has_images = selected_result.get('has_images', False) | |
if has_images: | |
view_tabs = st.tabs(["Document Content", "Raw JSON", "Images"]) | |
view_tab1, view_tab2, view_tab3 = view_tabs | |
else: | |
view_tabs = st.tabs(["Document Content", "Raw JSON"]) | |
view_tab1, view_tab2 = view_tabs | |
view_tab3 = None | |
# First tab - Document Content (simplified structured view) | |
with view_tab1: | |
# Display content in a cleaner, more streamlined format | |
if 'ocr_contents' in selected_result and isinstance(selected_result['ocr_contents'], dict): | |
# Create a more focused list of important sections | |
priority_sections = ["title", "content", "transcript", "summary"] | |
displayed_sections = set() | |
# First display priority sections | |
for section in priority_sections: | |
if section in selected_result['ocr_contents'] and selected_result['ocr_contents'][section]: | |
content = selected_result['ocr_contents'][section] | |
if isinstance(content, str) and content.strip(): | |
# Only add a subheader for meaningful section names, not raw_text | |
if section != "raw_text": | |
st.markdown(f"##### {section.replace('_', ' ').title()}") | |
# Format and display content | |
formatted_content = format_ocr_text(content, for_display=True) | |
st.markdown(formatted_content) | |
displayed_sections.add(section) | |
# Then display any remaining sections not already shown | |
for section, content in selected_result['ocr_contents'].items(): | |
if (section not in displayed_sections and | |
section not in ['error', 'partial_text'] and | |
content): | |
st.markdown(f"##### {section.replace('_', ' ').title()}") | |
if isinstance(content, str): | |
st.markdown(format_ocr_text(content, for_display=True)) | |
elif isinstance(content, list): | |
for item in content: | |
st.markdown(f"- {item}") | |
elif isinstance(content, dict): | |
for k, v in content.items(): | |
st.markdown(f"**{k}:** {v}") | |
# Second tab - Raw JSON (simplified) | |
with view_tab2: | |
# Extract the relevant JSON data | |
json_data = {} | |
# Include important metadata | |
for field in ['file_name', 'timestamp', 'processing_time', 'languages', 'topics', 'subjects', 'detected_document_type', 'text']: | |
if field in selected_result: | |
json_data[field] = selected_result[field] | |
# Include OCR contents | |
if 'ocr_contents' in selected_result: | |
json_data['ocr_contents'] = selected_result['ocr_contents'] | |
# Exclude large binary data like base64 images to keep JSON clean | |
if 'pages_data' in selected_result: | |
# Create simplified pages_data without large binary content | |
simplified_pages = [] | |
for page in selected_result['pages_data']: | |
simplified_page = { | |
'page_number': page.get('page_number', 0), | |
'has_text': bool(page.get('markdown', '')), | |
'has_images': bool(page.get('images', [])), | |
'image_count': len(page.get('images', [])) | |
} | |
simplified_pages.append(simplified_page) | |
json_data['pages_summary'] = simplified_pages | |
# Format the JSON prettily | |
json_str = json.dumps(json_data, indent=2) | |
# Display in a monospace font with syntax highlighting | |
st.code(json_str, language="json") | |
# Third tab - Images (simplified) | |
if has_images and view_tab3 is not None: | |
with view_tab3: | |
# Simplified image display | |
if 'pages_data' in selected_result: | |
for i, page_data in enumerate(selected_result['pages_data']): | |
# Display each page | |
if 'images' in page_data and len(page_data['images']) > 0: | |
for img in page_data['images']: | |
if 'image_base64' in img: | |
st.image(img['image_base64'], use_container_width=True) | |
# Get page text if available | |
page_text = "" | |
if 'markdown' in page_data: | |
page_text = page_data['markdown'] | |
# Display text if available | |
if page_text: | |
with st.expander(f"Page {i+1} Text", expanded=False): | |
st.text(page_text) | |
def display_about_tab(): | |
"""Display learn more tab content""" | |
st.header("Learn More") | |
# Add app description | |
st.markdown(""" | |
**Historical OCR** is a tailored academic tool for extracting text from historical documents, manuscripts, and printed materials. | |
""") | |
# Purpose section with consistent formatting | |
st.markdown("### Purpose") | |
st.markdown(""" | |
This tool is designed to assist scholars in historical research by extracting text from challenging documents. | |
While it may not achieve full accuracy for all materials, it serves as a tailored research aid for navigating | |
historical documents, particularly: | |
""") | |
st.markdown(""" | |
- **Historical newspapers** with complex layouts and aged text | |
- **Handwritten documents** from various time periods | |
- **Photos of archival materials** that may be difficult to read | |
""") | |
# Features section with consistent formatting | |
st.markdown("### Features") | |
st.markdown(""" | |
- **Advanced Image Preprocessing**: Optimize historical documents for better OCR results | |
- **Custom Document Type Processing**: Specialized handling for newspapers, letters, books, and more | |
- **Editable Results**: Review and edit extracted text directly in the interface | |
- **Structured Content Analysis**: Automatic organization of document content | |
- **Multi-language Support**: Process documents in various languages | |
- **PDF Processing**: Handle multi-page historical documents | |
""") | |
# How to Use section with consistent formatting | |
st.markdown("### How to Use") | |
st.markdown(""" | |
1. Upload a document (PDF or image) | |
2. Select the document type and adjust preprocessing options if needed | |
3. Add custom processing instructions for specialized documents | |
4. Process the document | |
5. Review, edit, and download the results | |
""") | |
# Technologies section with consistent formatting | |
st.markdown("### Technologies") | |
st.markdown(""" | |
- OCR processing using Mistral AI's advanced document understanding capabilities | |
- Image preprocessing with OpenCV | |
- PDF handling with pdf2image | |
- Web interface with Streamlit | |
""") | |
# Add version information | |
st.markdown("**Version:** 1.0.0") | |