Hoghoghi / app /api /ocr.py
Really-amin's picture
Upload 46 files
922c3ba verified
raw
history blame
11.2 kB
"""
OCR API Router
=============
PDF processing and text extraction endpoints.
"""
from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, BackgroundTasks
from typing import List, Dict, Any
import tempfile
import os
import logging
from pathlib import Path
from ..models.document_models import OCRRequest, OCRResponse
from ..services.ocr_service import OCRPipeline
from ..services.database_service import DatabaseManager
from ..services.ai_service import AIScoringEngine
logger = logging.getLogger(__name__)
router = APIRouter()
# Dependency injection
def get_ocr_pipeline():
return OCRPipeline()
def get_db():
return DatabaseManager()
def get_ai_engine():
return AIScoringEngine()
@router.post("/process", response_model=OCRResponse)
async def process_pdf(
file: UploadFile = File(...),
language: str = "fa",
model_name: str = None,
ocr_pipeline: OCRPipeline = Depends(get_ocr_pipeline)
):
"""Process a PDF file and extract text"""
try:
# Validate file type
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=400, detail="Only PDF files are supported")
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process PDF with OCR
result = ocr_pipeline.extract_text_from_pdf(temp_file_path)
# Create response
response = OCRResponse(
success=result.get('success', False),
extracted_text=result.get('extracted_text', ''),
confidence=result.get('confidence', 0.0),
processing_time=result.get('processing_time', 0.0),
language_detected=result.get('language_detected', language),
page_count=result.get('page_count', 0),
error_message=result.get('error_message')
)
return response
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing PDF: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/process-and-save")
async def process_and_save_document(
file: UploadFile = File(...),
title: str = None,
source: str = None,
category: str = None,
background_tasks: BackgroundTasks = None,
ocr_pipeline: OCRPipeline = Depends(get_ocr_pipeline),
db: DatabaseManager = Depends(get_db),
ai_engine: AIScoringEngine = Depends(get_ai_engine)
):
"""Process PDF and save as document in database"""
try:
# Validate file type
if not file.filename.lower().endswith('.pdf'):
raise HTTPException(
status_code=400, detail="Only PDF files are supported")
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process PDF with OCR
ocr_result = ocr_pipeline.extract_text_from_pdf(temp_file_path)
if not ocr_result.get('success', False):
raise HTTPException(
status_code=400,
detail=f"OCR processing failed: {ocr_result.get('error_message', 'Unknown error')}"
)
# Prepare document data
document_data = {
'title': title or file.filename,
'source': source or 'Uploaded',
'category': category or 'عمومی',
'full_text': ocr_result.get('extracted_text', ''),
'ocr_confidence': ocr_result.get('confidence', 0.0),
'processing_time': ocr_result.get('processing_time', 0.0),
'file_path': temp_file_path,
'file_size': os.path.getsize(temp_file_path),
'language': ocr_result.get('language_detected', 'fa'),
'page_count': ocr_result.get('page_count', 0)
}
# Calculate AI score
final_score = ai_engine.calculate_score(document_data)
document_data['final_score'] = final_score
# Predict category if not provided
if not document_data.get('category') or document_data['category'] == 'عمومی':
document_data['category'] = ai_engine.predict_category(
document_data.get('title', ''),
document_data.get('full_text', '')
)
# Extract keywords
keywords = ai_engine.extract_keywords(
document_data.get('full_text', ''))
document_data['keywords'] = keywords
# Save to database
document_id = db.insert_document(document_data)
# Get the created document
created_document = db.get_document_by_id(document_id)
return {
"message": "Document processed and saved successfully",
"document_id": document_id,
"document": created_document,
"ocr_result": ocr_result
}
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing and saving document: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/batch-process")
async def batch_process_pdfs(
files: List[UploadFile] = File(...),
background_tasks: BackgroundTasks = None,
ocr_pipeline: OCRPipeline = Depends(get_ocr_pipeline)
):
"""Process multiple PDF files"""
try:
results = []
for file in files:
try:
# Validate file type
if not file.filename.lower().endswith('.pdf'):
results.append({
"filename": file.filename,
"success": False,
"error": "Only PDF files are supported"
})
continue
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process PDF with OCR
result = ocr_pipeline.extract_text_from_pdf(temp_file_path)
results.append({
"filename": file.filename,
"success": result.get('success', False),
"extracted_text": result.get('extracted_text', ''),
"confidence": result.get('confidence', 0.0),
"processing_time": result.get('processing_time', 0.0),
"page_count": result.get('page_count', 0),
"error_message": result.get('error_message')
})
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
except Exception as e:
results.append({
"filename": file.filename,
"success": False,
"error": str(e)
})
return {
"total_files": len(files),
"processed_files": len([r for r in results if r.get('success', False)]),
"results": results
}
except Exception as e:
logger.error(f"Error in batch processing: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/quality-metrics")
async def get_ocr_quality_metrics(
document_id: str,
ocr_pipeline: OCRPipeline = Depends(get_ocr_pipeline),
db: DatabaseManager = Depends(get_db)
):
"""Get OCR quality metrics for a document"""
try:
# Get document
document = db.get_document_by_id(document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# Create extraction result for metrics
extraction_result = {
"extracted_text": document.get('full_text', ''),
"confidence": document.get('ocr_confidence', 0.0)
}
# Calculate quality metrics
metrics = ocr_pipeline.get_ocr_quality_metrics(extraction_result)
return {
"document_id": document_id,
"metrics": metrics,
"document_info": {
"title": document.get('title'),
"file_size": document.get('file_size'),
"processing_time": document.get('processing_time'),
"page_count": document.get('page_count', 0)
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting OCR quality metrics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/models")
async def get_available_models():
"""Get available OCR models"""
return {
"models": [
{
"name": "microsoft/trocr-base-stage1",
"description": "Microsoft TrOCR base model for printed text",
"language": "multilingual",
"type": "printed"
},
{
"name": "microsoft/trocr-base-handwritten",
"description": "Microsoft TrOCR base model for handwritten text",
"language": "multilingual",
"type": "handwritten"
},
{
"name": "microsoft/trocr-large-stage1",
"description": "Microsoft TrOCR large model for better accuracy",
"language": "multilingual",
"type": "printed"
}
],
"current_model": "microsoft/trocr-base-stage1"
}
@router.get("/status")
async def get_ocr_status(ocr_pipeline: OCRPipeline = Depends(get_ocr_pipeline)):
"""Get OCR pipeline status"""
return {
"initialized": ocr_pipeline.initialized,
"model_name": ocr_pipeline.model_name,
"initialization_attempted": ocr_pipeline.initialization_attempted
}