| """ |
| FastAPI application for PDF redaction using NER |
| """ |
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks |
| from fastapi.responses import FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import List, Optional, Dict |
| import uvicorn |
| import os |
| import uuid |
| import shutil |
| from pathlib import Path |
| import logging |
| import sys |
| from app.redaction import PDFRedactor |
| from client_supabase import supabase |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| stream=sys.stdout, |
| force=True, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = FastAPI( |
| title="PDF Redaction API", |
| description="Redact sensitive information from PDFs using Named Entity Recognition", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| UPLOAD_DIR = Path("uploads") |
| OUTPUT_DIR = Path("outputs") |
| UPLOAD_DIR.mkdir(exist_ok=True) |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| |
| redactor = PDFRedactor() |
|
|
| |
| class RedactionEntity(BaseModel): |
| entity_type: str |
| entity_text: str |
| page: int |
| word_count: int |
|
|
| class RedactionResponse(BaseModel): |
| job_id: str |
| status: str |
| message: str |
| entities: Optional[List[RedactionEntity]] = None |
| redacted_file_url: Optional[str] = None |
|
|
| class RedactionStatusResponse(BaseModel): |
| request_id: str |
| status: str |
| files: List[str] |
| message: str |
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str |
| model_loaded: bool |
|
|
| |
| def set_request_status(request_id: str, status: str): |
| """Update the status column in document_requests for the given request_id.""" |
| supabase.from_("document_requests").update({"status": status}).eq("id", request_id).execute() |
| logger.info(f"Request {request_id} status -> {status}") |
|
|
| def get_request_status(request_id: str) -> str: |
| """Fetch current status from document_requests.""" |
| response = ( |
| supabase |
| .from_("document_requests") |
| .select("status") |
| .eq("id", request_id) |
| .maybe_single() |
| .execute() |
| ) |
| if response.data: |
| return response.data["status"] |
| return "not_found" |
|
|
| |
| def get_public_url(bucket: str, storage_path: str) -> str: |
| return f"{os.getenv('SUPABASE_URL')}/storage/v1/object/public/{bucket}/{storage_path}" |
|
|
| def cleanup_files(job_id: str): |
| """Clean up temporary files after a delay""" |
| try: |
| upload_path = UPLOAD_DIR / f"{job_id}.pdf" |
| if upload_path.exists(): |
| upload_path.unlink() |
| logger.info(f"Cleaned up files for job {job_id}") |
| except Exception as e: |
| logger.error(f"Error cleaning up files for job {job_id}: {str(e)}") |
|
|
| def cleanup_temp_files(paths: List[Path]): |
| for path in paths: |
| if path.exists(): |
| path.unlink() |
|
|
| def download_file_from_supabase(bucket: str, storage_path: str, local_path: Path): |
| logger.info(f"Downloading {storage_path} to {local_path}") |
| data = supabase.storage.from_(bucket).download(storage_path) |
| if not data: |
| raise Exception(f"Failed to download {storage_path}") |
| with local_path.open("wb") as f: |
| f.write(data) |
|
|
| def upload_file_to_supabase(bucket: str, storage_path: str, local_path: Path): |
| logger.info(f"Uploading {local_path} to {storage_path}") |
| with local_path.open("rb") as f: |
| content = f.read() |
| supabase.storage.from_(bucket).upload( |
| path=storage_path, |
| file=content, |
| file_options={ |
| "upsert": "true", |
| "content-type": "application/pdf" |
| } |
| ) |
|
|
| def redact_request(request_id: str, bucket: str = "doc_storage"): |
| """ |
| Background task: redact all files for a given request_id. |
| DB writes: 2 total — one at start (redacting), one at end (redacted | failed). |
| The 'pending' write is done by the endpoint before this task is dispatched. |
| """ |
| try: |
| print("Request arrived at redact_request function") |
| |
| set_request_status(request_id, "redacting") |
|
|
| response = ( |
| supabase |
| .from_("request_files") |
| .select("id, storage_path") |
| .eq("request_id", request_id) |
| .execute() |
| ) |
|
|
| files = response.data |
| if not files: |
| set_request_status(request_id, "approved") |
| raise Exception(f"No files found for request {request_id}") |
|
|
| for file in files: |
| storage_path = file["storage_path"] |
| local_upload = UPLOAD_DIR / f"{uuid.uuid4()}.pdf" |
| local_output = OUTPUT_DIR / f"{uuid.uuid4()}_redacted.pdf" |
|
|
| download_file_from_supabase(bucket, storage_path, local_upload) |
| redactor.redact_document(pdf_path=str(local_upload), output_path=str(local_output)) |
| upload_file_to_supabase(bucket, storage_path, local_output) |
| cleanup_temp_files([local_upload, local_output]) |
|
|
| |
| set_request_status(request_id, "redacted") |
|
|
| except Exception as e: |
| print(f"Redaction failed for {request_id}: {str(e)}") |
| logger.error(f"Redaction failed for {request_id}: {str(e)}") |
| |
| set_request_status(request_id, "failed") |
|
|
| |
| @app.get("/", response_model=HealthResponse) |
| async def root(): |
| return HealthResponse( |
| status="healthy", |
| version="1.0.0", |
| model_loaded=redactor.is_model_loaded() |
| ) |
|
|
| @app.get("/health", response_model=HealthResponse) |
| async def health_check(): |
| return HealthResponse( |
| status="healthy", |
| version="1.0.0", |
| model_loaded=redactor.is_model_loaded() |
| ) |
|
|
| @app.post("/redact", response_model=RedactionResponse) |
| async def redact_pdf( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| dpi: int = 300, |
| entity_types: Optional[str] = None |
| ): |
| if not file.filename.endswith('.pdf'): |
| raise HTTPException(status_code=400, detail="Only PDF files are supported") |
| job_id = str(uuid.uuid4()) |
| upload_path = UPLOAD_DIR / f"{job_id}.pdf" |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| try: |
| with upload_path.open("wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| entity_filter = None |
| if entity_types: |
| entity_filter = [et.strip() for et in entity_types.split(',')] |
|
|
| result = redactor.redact_document( |
| pdf_path=str(upload_path), |
| output_path=str(output_path), |
| dpi=dpi, |
| entity_filter=entity_filter |
| ) |
|
|
| response_entities = [ |
| RedactionEntity( |
| entity_type=e['entity_type'], |
| entity_text=e['entity_text'], |
| page=e['words'][0]['page'] if e['words'] else 0, |
| word_count=len(e['words']) |
| ) for e in result['entities'] |
| ] |
|
|
| background_tasks.add_task(cleanup_files, job_id) |
|
|
| return RedactionResponse( |
| job_id=job_id, |
| status="completed", |
| message=f"Successfully redacted {len(result['entities'])} entities", |
| entities=response_entities, |
| redacted_file_url=f"/download/{job_id}" |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing job {job_id}: {str(e)}") |
| if upload_path.exists(): |
| upload_path.unlink() |
| if output_path.exists(): |
| output_path.unlink() |
| raise HTTPException(status_code=500, detail=f"Error processing PDF: {str(e)}") |
|
|
| @app.get("/download/{job_id}") |
| async def download_redacted_pdf(job_id: str): |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| if not output_path.exists(): |
| raise HTTPException(status_code=404, detail="Redacted file not found") |
| return FileResponse( |
| path=output_path, |
| media_type="application/pdf", |
| filename=f"redacted_{job_id}.pdf" |
| ) |
|
|
| @app.delete("/cleanup/{job_id}") |
| async def cleanup_job(job_id: str): |
| try: |
| cleanup_files(job_id) |
| output_path = OUTPUT_DIR / f"{job_id}_redacted.pdf" |
| if output_path.exists(): |
| output_path.unlink() |
| return {"message": f"Successfully cleaned up files for job {job_id}"} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error cleaning up: {str(e)}") |
|
|
| @app.get("/stats") |
| async def get_stats(): |
| upload_count = len(list(UPLOAD_DIR.glob("*.pdf"))) |
| output_count = len(list(OUTPUT_DIR.glob("*.pdf"))) |
| return { |
| "pending_uploads": upload_count, |
| "processed_files": output_count, |
| "model_loaded": redactor.is_model_loaded() |
| } |
|
|
| |
| @app.post("/redact_by_request/{request_id}", response_model=RedactionStatusResponse) |
| async def redact_by_request(request_id: str, background_tasks: BackgroundTasks): |
| |
| current_status = get_request_status(request_id) |
|
|
| if current_status == "redacting": |
| return RedactionStatusResponse( |
| request_id=request_id, |
| status="redacting", |
| files=[], |
| message="Redaction already in progress" |
| ) |
|
|
| |
| set_request_status(request_id, "pending") |
| background_tasks.add_task(redact_request, request_id) |
|
|
| return RedactionStatusResponse( |
| request_id=request_id, |
| status="pending", |
| files=[], |
| message="Redaction started in background" |
| ) |
|
|
| @app.get("/redaction_status/{request_id}", response_model=RedactionStatusResponse) |
| async def get_redaction_status(request_id: str): |
| status = get_request_status(request_id) |
|
|
| files: List[str] = [] |
|
|
| if status == "redacted": |
| response = ( |
| supabase |
| .from_("request_files") |
| .select("storage_path") |
| .eq("request_id", request_id) |
| .execute() |
| ) |
| if response.data: |
| files = [ |
| get_public_url("doc_storage", row["storage_path"]) |
| for row in response.data |
| ] |
|
|
| message = { |
| "redacted": "Redaction completed", |
| "pending": "Redaction pending", |
| "redacting": "Redaction in progress", |
| "failed": "Redaction failed", |
| "not_found": "Request not found", |
| }.get(status, status) |
|
|
| return RedactionStatusResponse( |
| request_id=request_id, |
| status=status, |
| files=files, |
| message=message |
| ) |