Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile | |
| from fastapi.responses import JSONResponse | |
| import base64 | |
| import re | |
| app = FastAPI() | |
| from io import BytesIO | |
| from gmft.pdf_bindings import PyPDFium2Document | |
| from utils import get_page_text_with_tables, detector, formatter | |
| def extract_text_from_pdf(pdf_bytes: bytes, page_numbers=None) -> str: | |
| """ | |
| Extract text from PDF bytes using gmft without temporary files | |
| """ | |
| # Create a PyPDFium2Document directly from bytes | |
| doc = PyPDFium2Document(pdf_bytes) | |
| page_set = set(page_numbers if page_numbers else list(range(len(doc)))) | |
| try: | |
| pages = [] | |
| for page_num, page in enumerate(doc): | |
| if not page_num in page_set: | |
| continue | |
| try: | |
| tables = detector.extract(page) | |
| fmt_tables = [formatter.extract(table, margin=(0, 0, 0, 0)) for table in tables] | |
| page_text = get_page_text_with_tables(page, fmt_tables) | |
| pages.append(page_text) | |
| finally: | |
| page.close() | |
| finally: | |
| doc.close() | |
| return pages | |
| def greet_json(): | |
| return {"Hello": "World!"} | |
| async def extract_pdf_text(file: UploadFile = File(...), page_numbers: str = None): | |
| """ | |
| Endpoint to extract text from uploaded PDF file | |
| """ | |
| # Check if the uploaded file is a PDF | |
| if not file.filename.lower().endswith('.pdf'): | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Only PDF files are supported"} | |
| ) | |
| # Read the file content | |
| content = await file.read() | |
| # Parse page_numbers if provided | |
| parsed_page_numbers = None | |
| if page_numbers: | |
| try: | |
| # Convert comma-separated string to list of integers | |
| parsed_page_numbers = [int(p.strip()) for p in page_numbers.split(',') if p.strip()] | |
| except ValueError: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Invalid page_numbers format. Use comma-separated integers."} | |
| ) | |
| try: | |
| # Extract text from PDF | |
| extracted_text = extract_text_from_pdf(content, parsed_page_numbers) | |
| return { | |
| "filename": file.filename, | |
| "text": extracted_text | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to extract text: {str(e)}"} | |
| ) | |
| async def extract_pdf_text_base64(data: dict): | |
| """ | |
| Endpoint to extract text from PDF provided as base64 encoded string | |
| """ | |
| # Check if 'file' key exists in request | |
| if 'file' not in data: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Missing 'file' field in request body"} | |
| ) | |
| # Get the base64 encoded string | |
| base64_string = data['file'] | |
| # Extract filename if provided | |
| filename = data.get('filename', 'unknown.pdf') | |
| # Extract page_numbers if provided | |
| page_numbers = data.get('page_numbers') | |
| parsed_page_numbers = None | |
| if page_numbers: | |
| try: | |
| # Handle both string and list formats | |
| if isinstance(page_numbers, str): | |
| parsed_page_numbers = [int(p.strip()) for p in page_numbers.split(',') if p.strip()] | |
| elif isinstance(page_numbers, list): | |
| parsed_page_numbers = [int(p) for p in page_numbers if isinstance(p, (int, str))] | |
| else: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Invalid page_numbers format. Use comma-separated integers or array."} | |
| ) | |
| except (ValueError, TypeError): | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Invalid page_numbers format. Use comma-separated integers or array."} | |
| ) | |
| try: | |
| # Handle data URL format (e.g., "data:application/pdf;base64,...") | |
| if base64_string.startswith('data:'): | |
| # Extract the base64 part after the comma | |
| match = re.search(r'base64,(.*)', base64_string) | |
| if match: | |
| base64_string = match.group(1) | |
| else: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Invalid data URL format"} | |
| ) | |
| pdf_bytes = base64.b64decode(base64_string) | |
| # Extract text from PDF | |
| extracted_text = extract_text_from_pdf(pdf_bytes, parsed_page_numbers) | |
| return { | |
| "filename": filename, | |
| "text": extracted_text | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Failed to process base64 PDF: {str(e)}"} | |
| ) |