|
|
|
|
|
|
|
|
import base64 |
|
|
import json |
|
|
import asyncio |
|
|
import re |
|
|
import os |
|
|
import html |
|
|
import requests |
|
|
import httpx |
|
|
import uuid |
|
|
import tempfile |
|
|
import io |
|
|
import traceback |
|
|
import atexit |
|
|
import functools |
|
|
from queue import Queue |
|
|
from threading import Event, Thread |
|
|
|
|
|
|
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
from fastapi import FastAPI, File, Form, UploadFile, HTTPException, Request, Header |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse, HTMLResponse |
|
|
from fastapi import Depends |
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
|
|
|
|
|
|
|
|
from pydantic import BaseModel |
|
|
|
|
|
|
|
|
from requests.exceptions import RequestException |
|
|
|
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
import google.generativeai as genai |
|
|
from google.api_core import exceptions as google_exceptions |
|
|
|
|
|
|
|
|
from gradio_client import Client, handle_file |
|
|
|
|
|
|
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
import pytesseract |
|
|
from auth.clerk import verify_clerk_jwt |
|
|
|
|
|
|
|
|
from tools.tools import analyze_contract |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Document Translator (Final Architecture)", |
|
|
description="Pipeline: Nemo (JSON) -> Sea-Lion (Translate JSON) -> Gemini (HTML)", |
|
|
version="10.0.1", |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["https://fair-work-contract.vercel.app"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
security = HTTPBearer() |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
SUPABASE_URL = os.getenv("SUPABASE_URL") |
|
|
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/analyze_contract") |
|
|
async def analyze_contract_endpoint(file: UploadFile = File(...)): |
|
|
""" |
|
|
Receives an uploaded HTML contract, analyzes it to extract key clauses |
|
|
and language, and returns a structured JSON response containing a |
|
|
user-friendly HTML summary sheet. |
|
|
""" |
|
|
|
|
|
if file.content_type != "text/html": |
|
|
raise HTTPException( |
|
|
status_code=400, detail="Unsupported file type. Please upload a .html file." |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
html_content_bytes = await file.read() |
|
|
html_content = html_content_bytes.decode("utf-8") |
|
|
|
|
|
|
|
|
analysis_results = await analyze_contract(html_content) |
|
|
|
|
|
|
|
|
if "error" in analysis_results: |
|
|
|
|
|
raise HTTPException(status_code=500, detail=analysis_results["error"]) |
|
|
|
|
|
|
|
|
|
|
|
return analysis_results |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=500, detail=f"An unexpected server error occurred: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/upload") |
|
|
async def upload_file(authorization: str = Header(...), file: UploadFile = File(...)): |
|
|
if not authorization.startswith("Bearer "): |
|
|
raise HTTPException(status_code=401, detail="Missing Bearer token") |
|
|
|
|
|
token = authorization.split(" ")[1] |
|
|
claims = await verify_clerk_jwt(token) |
|
|
|
|
|
user_id = claims.get("sub") |
|
|
filename = f"{user_id}/{uuid.uuid4()}.png" |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
upload_resp = await client.post( |
|
|
f"{SUPABASE_URL}/storage/v1/object/user-documents/{filename}", |
|
|
headers={ |
|
|
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", |
|
|
"Content-Type": file.content_type, |
|
|
}, |
|
|
content=await file.read(), |
|
|
) |
|
|
|
|
|
if upload_resp.status_code != 200: |
|
|
raise HTTPException( |
|
|
status_code=500, detail="Failed to upload to Supabase Storage" |
|
|
) |
|
|
|
|
|
file_url = f"user-documents/{filename}" |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
insert_resp = await client.post( |
|
|
f"{SUPABASE_URL}/rest/v1/documents", |
|
|
headers={ |
|
|
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", |
|
|
"apikey": SUPABASE_SERVICE_ROLE_KEY, |
|
|
"Content-Type": "application/json", |
|
|
"Prefer": "return=representation", |
|
|
}, |
|
|
json={ |
|
|
"user_id": user_id, |
|
|
"filename": filename.split("/")[-1], |
|
|
"file_url": file_url, |
|
|
}, |
|
|
) |
|
|
|
|
|
if insert_resp.status_code >= 300: |
|
|
raise HTTPException( |
|
|
status_code=500, detail="Failed to insert document metadata" |
|
|
) |
|
|
|
|
|
return {"message": f"File uploaded as {filename}"} |
|
|
|
|
|
|
|
|
@app.get("/api/documents") |
|
|
async def get_user_documents( |
|
|
credentials: HTTPAuthorizationCredentials = Depends(security), |
|
|
): |
|
|
token = credentials.credentials |
|
|
claims = await verify_clerk_jwt(token) |
|
|
user_id = claims.get("sub") |
|
|
if not user_id: |
|
|
raise HTTPException(status_code=401, detail="Invalid user") |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
resp = await client.get( |
|
|
f"{SUPABASE_URL}/rest/v1/documents?user_id=eq.{user_id}", |
|
|
headers={ |
|
|
"apikey": SUPABASE_SERVICE_ROLE_KEY, |
|
|
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", |
|
|
"Accept": "application/json", |
|
|
}, |
|
|
) |
|
|
|
|
|
if resp.status_code != 200: |
|
|
raise HTTPException(status_code=500, detail="Failed to fetch documents") |
|
|
|
|
|
documents = resp.json() |
|
|
|
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
for doc in documents: |
|
|
file_path = doc["file_url"].split("user-documents/", 1)[-1] |
|
|
if not file_path: |
|
|
doc["signed_url"] = None |
|
|
continue |
|
|
|
|
|
signed_url_resp = await client.post( |
|
|
f"{SUPABASE_URL}/storage/v1/object/sign/user-documents/{file_path}", |
|
|
headers={ |
|
|
"apikey": SUPABASE_SERVICE_ROLE_KEY, |
|
|
"Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", |
|
|
|
|
|
}, |
|
|
json={"expiresIn": 3600}, |
|
|
) |
|
|
|
|
|
if signed_url_resp.status_code == 200: |
|
|
print( |
|
|
f"{SUPABASE_URL}/storage/v1{signed_url_resp.json().get('signedURL')}" |
|
|
) |
|
|
doc["signed_url"] = ( |
|
|
f"{SUPABASE_URL}/storage/v1{signed_url_resp.json().get('signedURL')}" |
|
|
) |
|
|
|
|
|
else: |
|
|
doc["signed_url"] = None |
|
|
print(documents) |
|
|
|
|
|
return documents |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def call_sealion_for_translation(text_to_translate: str, lang: str) -> str: |
|
|
"""Helper function to call the translation API for a single piece of text.""" |
|
|
if not text_to_translate.strip(): |
|
|
return "" |
|
|
|
|
|
url = "https://api.sea-lion.ai/v1/chat/completions" |
|
|
api_key = os.getenv("SEALION_API_KEY") |
|
|
if not api_key: |
|
|
print("Warning: SEALION_API_KEY not set. Skipping translation.") |
|
|
return f"{text_to_translate} (Translation Skipped)" |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {api_key}", |
|
|
"Content-Type": "application/json", |
|
|
} |
|
|
|
|
|
prompt = f'Translate the following text to {lang}. Return ONLY the translated text, without any additional explanations, formatting, or quotation marks:\n\n"{text_to_translate}"' |
|
|
payload = { |
|
|
"max_completion_tokens": 2048, |
|
|
"messages": [{"role": "user", "content": prompt}], |
|
|
"model": "aisingapore/Llama-SEA-LION-v3-70B-IT", |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
try: |
|
|
response = await client.post( |
|
|
url, headers=headers, json=payload, timeout=45.0 |
|
|
) |
|
|
response.raise_for_status() |
|
|
response_json = response.json() |
|
|
translated_text = response_json["choices"][0]["message"]["content"].strip() |
|
|
|
|
|
return re.sub(r'^"|"$', "", translated_text) |
|
|
except httpx.RequestError as e: |
|
|
print(f"Translation request failed: {e}") |
|
|
return f"Translation Error: {text_to_translate}" |
|
|
except (KeyError, IndexError) as e: |
|
|
print(f"Could not parse translation response: {e}") |
|
|
return f"Translation Parsing Error: {text_to_translate}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def translate_texts_with_gemini(texts: list[str], target_language: str) -> list[str]: |
|
|
""" |
|
|
Translates a list of texts using Gemini in a single batch API call. |
|
|
""" |
|
|
if not texts: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables.") |
|
|
|
|
|
genai.configure(api_key=api_key) |
|
|
model = genai.GenerativeModel(model_name="gemini-2.5-flash") |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
Translate each string in the following JSON array of strings to {target_language}. |
|
|
Return a single JSON array where each element is the translated string corresponding |
|
|
to the original at the same index. Your output MUST be only the JSON array and nothing else. |
|
|
|
|
|
Example Input: |
|
|
["Hello world", "How are you?"] |
|
|
|
|
|
Example Output for target language 'Spanish': |
|
|
["Hola mundo", "¿Cómo estás?"] |
|
|
|
|
|
Input for this task: |
|
|
{json.dumps(texts)} |
|
|
""" |
|
|
|
|
|
def do_request(): |
|
|
"""Synchronous function to be run in a separate thread.""" |
|
|
response = model.generate_content(prompt) |
|
|
return response.text.strip() |
|
|
|
|
|
|
|
|
response_text = await asyncio.to_thread(do_request) |
|
|
|
|
|
|
|
|
json_response_match = re.search(r'\[.*\]', response_text, re.DOTALL) |
|
|
if not json_response_match: |
|
|
print(f"Warning: Gemini did not return a valid JSON array. Response: {response_text}") |
|
|
|
|
|
return texts |
|
|
|
|
|
cleaned_json = json_response_match.group(0) |
|
|
translated_texts = json.loads(cleaned_json) |
|
|
|
|
|
if len(translated_texts) != len(texts): |
|
|
print(f"Warning: Mismatch in translation count. Expected {len(texts)}, got {len(translated_texts)}.") |
|
|
|
|
|
return texts |
|
|
|
|
|
return translated_texts |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred during Gemini translation: {e}") |
|
|
|
|
|
return texts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_hocr_from_image(image_bytes: bytes) -> str: |
|
|
""" |
|
|
Performs OCR using Tesseract to get raw hOCR HTML output. |
|
|
This function accepts image bytes. |
|
|
""" |
|
|
if not image_bytes: |
|
|
raise ValueError("Image bytes cannot be empty.") |
|
|
|
|
|
try: |
|
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Cannot open image for Tesseract. It may be corrupted or unsupported. Error: {e}", |
|
|
) |
|
|
|
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
hocr_bytes = await loop.run_in_executor( |
|
|
None, lambda: pytesseract.image_to_pdf_or_hocr(image, extension="hocr") |
|
|
) |
|
|
return hocr_bytes.decode("utf-8") |
|
|
|
|
|
|
|
|
async def extract_text_and_boxes_with_paddle(image_bytes: bytes) -> list[dict]: |
|
|
""" |
|
|
Extracts text and their bounding boxes from an image using PaddleOCR. |
|
|
Returns the full list of dictionary objects from the OCR tool. |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
|
|
temp_file.write(image_bytes) |
|
|
temp_filepath = temp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
def do_ocr() -> list[dict]: |
|
|
"""Synchronous function to be run in a separate thread.""" |
|
|
client = Client("kevansoon/PaddleOCR") |
|
|
|
|
|
result = client.predict( |
|
|
img=handle_file(temp_filepath), |
|
|
lang="en", |
|
|
api_name="/predict", |
|
|
) |
|
|
return result |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
extracted_data = await loop.run_in_executor(None, do_ocr) |
|
|
if not extracted_data: |
|
|
print("Warning: PaddleOCR returned no data.") |
|
|
return [] |
|
|
return extracted_data |
|
|
finally: |
|
|
os.unlink(temp_filepath) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def translate_hocr_html_with_gemini(hocr_html: str, target_language: str) -> str: |
|
|
""" |
|
|
Parses hOCR, translates all text in a single batch call to Gemini, |
|
|
and injects translations back into the HTML. |
|
|
""" |
|
|
soup = BeautifulSoup(hocr_html, "html.parser") |
|
|
elements_to_translate = soup.find_all(class_="ocrx_word") |
|
|
if not elements_to_translate: |
|
|
elements_to_translate = soup.find_all(class_="ocr_line") |
|
|
|
|
|
original_texts = [el.get_text(strip=True) for el in elements_to_translate] |
|
|
|
|
|
|
|
|
translated_texts = await translate_texts_with_gemini(original_texts, target_language) |
|
|
|
|
|
|
|
|
for i, element in enumerate(elements_to_translate): |
|
|
if element.string: |
|
|
|
|
|
if i < len(translated_texts): |
|
|
element.string.replace_with(translated_texts[i]) |
|
|
|
|
|
return str(soup) |
|
|
|
|
|
|
|
|
async def translate_paddle_data_with_gemini( |
|
|
paddle_data: list[dict], target_language: str |
|
|
) -> list[dict]: |
|
|
""" |
|
|
Translates the 'text' field of each item in the paddle_data list |
|
|
using a single batch call to Gemini. |
|
|
""" |
|
|
original_texts = [item.get("text", "") for item in paddle_data] |
|
|
|
|
|
|
|
|
translated_texts = await translate_texts_with_gemini(original_texts, target_language) |
|
|
|
|
|
translated_data = [] |
|
|
for i, item in enumerate(paddle_data): |
|
|
|
|
|
translated_text = translated_texts[i] if i < len(translated_texts) else original_texts[i] |
|
|
translated_data.append({"text": translated_text, "box": item.get("box")}) |
|
|
|
|
|
return translated_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_html_from_dual_ocr( |
|
|
translated_hocr_html: str, translated_paddle_data: list[dict] |
|
|
) -> str: |
|
|
""" |
|
|
Receives translated hOCR and PaddleOCR data and uses Gemini to generate |
|
|
a final, layout-aware HTML document. |
|
|
""" |
|
|
try: |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables.") |
|
|
|
|
|
genai.configure(api_key=api_key) |
|
|
model = genai.GenerativeModel(model_name="gemini-2.5-flash") |
|
|
|
|
|
prompt = f""" |
|
|
You are provided with two different translated OCR outputs for the same document. |
|
|
Your task is to MERGE them into a SINGLE, CLEAN, and WELL-STYLED HTML document that can be rendered directly in an iframe. |
|
|
|
|
|
Input 1: Translated hOCR HTML |
|
|
--- HOCR START --- |
|
|
{translated_hocr_html} |
|
|
--- HOCR END --- |
|
|
|
|
|
Input 2: Translated PaddleOCR data (Python list of dicts with 'text' and 'box'): |
|
|
--- PADDLEOCR START --- |
|
|
{str(translated_paddle_data)} |
|
|
--- PADDLEOCR END --- |
|
|
|
|
|
STRICT RULES: |
|
|
1. You MUST output ONLY the FINAL RAW HTML code. |
|
|
- No ```html, no triple quotes, no markdown, no explanations. |
|
|
- Output must begin with <!DOCTYPE html> and end with </html>. |
|
|
2. ALL text from the second input (PaddleOCR) MUST be included in the final HTML without omission. |
|
|
- Every PaddleOCR text must appear exactly once in the correct order and location. |
|
|
3. The HTML must be fully self-contained: |
|
|
- Include <html>, <head>, <style>, and <body>. |
|
|
- Include CSS in a <style> block so it renders exactly in an iframe. |
|
|
4. Table structure requirement: |
|
|
- Use <table>, <tbody>, <tr>, and <td> to organize words into rows and columns. |
|
|
- Each PaddleOCR word must be placed in a separate <td> within the correct row based on vertical alignment. |
|
|
- Apply CSS for borders, padding, and cell alignment to ensure readability. |
|
|
- Use colspan/rowspan where necessary to match the original layout. |
|
|
5. Positioning: |
|
|
- Use bounding box data to size and place each cell proportionally. |
|
|
- Avoid text overlap — if bounding boxes would overlap, adjust table cell spans or widths. |
|
|
6. Before outputting: |
|
|
- Validate internally that the HTML is valid. |
|
|
- Confirm every PaddleOCR text appears in the table. |
|
|
- Confirm the table renders correctly in an iframe. |
|
|
|
|
|
FINAL OUTPUT REQUIREMENT: |
|
|
- Output ONLY the complete, valid HTML — no commentary, no extra text. |
|
|
""" |
|
|
|
|
|
def do_request(): |
|
|
"""Synchronous function to be run in a separate thread.""" |
|
|
response = model.generate_content(prompt) |
|
|
return response.text.strip() |
|
|
|
|
|
return await asyncio.to_thread(do_request) |
|
|
|
|
|
except Exception as e: |
|
|
error_message = f"An error occurred while generating the HTML structure with Gemini: {str(e)}" |
|
|
traceback.print_exc() |
|
|
return f"<html><body><h1>HTML Generation Error</h1><p>{html.escape(error_message)}</p></body></html>" |
|
|
|
|
|
|
|
|
@app.post("/api/translate_file_gemini", response_class=HTMLResponse) |
|
|
async def translate_document_dual_ocr( |
|
|
target_language: str = Form(...), file: UploadFile = File(...) |
|
|
): |
|
|
""" |
|
|
Processes a document using a dual OCR pipeline: |
|
|
1. Tesseract and PaddleOCR extract text and coordinates concurrently. |
|
|
2. Gemini translates the text from both outputs concurrently using a batch method. |
|
|
3. Gemini uses both translated outputs to generate the final layout-aware HTML. |
|
|
""" |
|
|
content_type = file.content_type |
|
|
if content_type not in ["image/png", "image/jpeg", "image/bmp", "image/tiff"]: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Unsupported file type. Please use PNG, JPG, BMP or TIFF.", |
|
|
) |
|
|
|
|
|
try: |
|
|
await file.seek(0) |
|
|
image_bytes = await file.read() |
|
|
if not image_bytes: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty.") |
|
|
|
|
|
|
|
|
print( |
|
|
"***** Step 1: Starting concurrent OCR extraction (Tesseract & PaddleOCR) ******" |
|
|
) |
|
|
hocr_task = get_hocr_from_image(image_bytes) |
|
|
paddle_task = extract_text_and_boxes_with_paddle(image_bytes) |
|
|
hocr_html, paddle_data = await asyncio.gather(hocr_task, paddle_task) |
|
|
|
|
|
if (not hocr_html or "ocr_page" not in hocr_html) and not paddle_data: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Neither Tesseract nor PaddleOCR could extract any data from the image.", |
|
|
) |
|
|
print("***** Step 1 Done: Finished OCR extraction ******") |
|
|
|
|
|
|
|
|
print("***** Step 2: Starting concurrent translation with Gemini ******") |
|
|
translated_hocr_task = translate_hocr_html_with_gemini( |
|
|
hocr_html, target_language |
|
|
) |
|
|
translated_paddle_task = translate_paddle_data_with_gemini( |
|
|
paddle_data, target_language |
|
|
) |
|
|
translated_hocr, translated_paddle = await asyncio.gather( |
|
|
translated_hocr_task, translated_paddle_task |
|
|
) |
|
|
print("***** Step 2 Done: Finished translation ******") |
|
|
|
|
|
|
|
|
print( |
|
|
"***** Step 3: Generating final HTML from dual OCR data via Gemini ******" |
|
|
) |
|
|
final_html = await generate_html_from_dual_ocr( |
|
|
translated_hocr, translated_paddle |
|
|
) |
|
|
|
|
|
print("***** Step 3 Done: Generated final HTML ******") |
|
|
|
|
|
return HTMLResponse(content=final_html) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"An unexpected error occurred during processing: {str(e)}", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def translate_texts_with_gemini_flash(texts: list[str], target_language: str) -> list[str]: |
|
|
""" |
|
|
Translates a list of texts using Gemini 1.5 Flash with a robust prompt |
|
|
and dedicated JSON mode for reliable, structured output. |
|
|
""" |
|
|
if not texts: |
|
|
return [] |
|
|
|
|
|
print(f"--- Starting translation of {len(texts)} text snippets to {target_language} ---") |
|
|
|
|
|
try: |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables.") |
|
|
|
|
|
genai.configure(api_key=api_key) |
|
|
|
|
|
|
|
|
generation_config = genai.GenerationConfig(response_mime_type="application/json") |
|
|
model = genai.GenerativeModel( |
|
|
model_name="gemini-2.5-flash", |
|
|
generation_config=generation_config, |
|
|
) |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
You are an expert translator. Your task is to translate a JSON array of English strings to {target_language}. |
|
|
|
|
|
**Instructions:** |
|
|
1. Translate each string in the input JSON array to {target_language}. |
|
|
2. If a string appears to be nonsensical, a random sequence of characters, or an OCR error, |
|
|
return the original string as-is without attempting to translate it. |
|
|
3. Your response MUST be a single, valid JSON array containing the translated strings. |
|
|
4. The output array must have the exact same number of elements as the input array. |
|
|
5. Do not include any explanatory text, markdown, or anything other than the JSON array itself. |
|
|
|
|
|
**Example Input:** |
|
|
["EMPLOYMENT CONTRACT", "by and between:.", "fne eahondwn aen nq awn"] |
|
|
|
|
|
**Example Output for a target language of 'Spanish':** |
|
|
["CONTRATO DE EMPLEO", "por y entre:.", "fne eahondwn aen nq awn"] |
|
|
|
|
|
**Input for this task:** |
|
|
{json.dumps(texts)} |
|
|
""" |
|
|
|
|
|
async def do_request(): |
|
|
"""Asynchronous function to make the API call.""" |
|
|
response = await model.generate_content_async(prompt) |
|
|
return response.text |
|
|
|
|
|
response_text = await do_request() |
|
|
|
|
|
|
|
|
translated_texts = json.loads(response_text) |
|
|
|
|
|
if len(translated_texts) != len(texts): |
|
|
print(f"Warning: Mismatch in translation count. Expected {len(texts)}, got {len(translated_texts)}.") |
|
|
return texts |
|
|
|
|
|
print("--- Successfully translated texts. ---") |
|
|
return translated_texts |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred during Gemini translation: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
return texts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def extract_text_and_boxes_with_paddle(image_bytes: bytes) -> list[dict]: |
|
|
""" |
|
|
Extracts text and their bounding boxes from an image using PaddleOCR. |
|
|
Returns the full list of dictionary objects from the OCR tool. |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
|
|
temp_file.write(image_bytes) |
|
|
temp_filepath = temp_file.name |
|
|
|
|
|
try: |
|
|
def do_ocr() -> list[dict]: |
|
|
"""Synchronous function to be run in a separate thread.""" |
|
|
client = Client("kevansoon/PaddleOCR") |
|
|
|
|
|
result = client.predict( |
|
|
img=handle_file(temp_filepath), |
|
|
lang="en", |
|
|
api_name="/predict", |
|
|
) |
|
|
return result |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
|
extracted_data = await loop.run_in_executor(None, do_ocr) |
|
|
if not extracted_data: |
|
|
print("Warning: PaddleOCR returned no data.") |
|
|
return [] |
|
|
return extracted_data |
|
|
finally: |
|
|
os.unlink(temp_filepath) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def translate_paddle_data_with_gemini( |
|
|
paddle_data: list[dict], target_language: str |
|
|
) -> list[dict]: |
|
|
""" |
|
|
Translates the 'text' field of each item in the paddle_data list |
|
|
using a single batch call to the robust Gemini translation function. |
|
|
""" |
|
|
original_texts = [item.get("text", "") for item in paddle_data] |
|
|
if not original_texts: |
|
|
return [] |
|
|
|
|
|
|
|
|
translated_texts = await translate_texts_with_gemini_flash(original_texts, target_language) |
|
|
|
|
|
translated_data = [] |
|
|
for i, item in enumerate(paddle_data): |
|
|
|
|
|
translated_text = translated_texts[i] if i < len(translated_texts) else original_texts[i] |
|
|
translated_data.append({"text": translated_text, "box": item.get("box")}) |
|
|
|
|
|
return translated_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_html_from_paddle_ocr(translated_paddle_data: list[dict]) -> str: |
|
|
""" |
|
|
Receives translated PaddleOCR data and uses Gemini to generate |
|
|
a final, layout-aware HTML document. |
|
|
""" |
|
|
try: |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables.") |
|
|
|
|
|
genai.configure(api_key=api_key) |
|
|
|
|
|
model = genai.GenerativeModel(model_name="gemini-2.5-flash") |
|
|
|
|
|
prompt = f""" |
|
|
You are an expert web developer specializing in converting structured data into clean HTML. |
|
|
Your task is to convert the provided OCR data into a SINGLE, WELL-STYLED HTML document that can be rendered directly in an iframe. |
|
|
|
|
|
Input: A Python list of dictionaries with 'text' and 'box' coordinates: |
|
|
--- OCR DATA START --- |
|
|
{str(translated_paddle_data)} |
|
|
--- OCR DATA END --- |
|
|
|
|
|
STRICT RULES: |
|
|
1. You MUST output ONLY the FINAL RAW HTML code. |
|
|
- No ```html, no triple quotes, no markdown, no explanations. |
|
|
- Output must begin with <!DOCTYPE html> and end with </html>. |
|
|
2. ALL text from the input (PaddleOCR) MUST be included in the final HTML without omission. |
|
|
- Every PaddleOCR text must appear exactly once in the correct order and location. |
|
|
3. The HTML must be fully self-contained: |
|
|
- Include <html>, <head>, <style>, and <body>. |
|
|
- Include CSS in a <style> block so it renders exactly in an iframe. |
|
|
4. Table structure requirement: |
|
|
- Use <table>, <tbody>, <tr>, and <td> to organize words into rows and columns. |
|
|
- Each PaddleOCR word must be placed in a separate <td> within the correct row based on vertical alignment. |
|
|
- Apply CSS for borders, padding, and cell alignment to ensure readability. |
|
|
- Use colspan/rowspan where necessary to match the original layout. |
|
|
5. Positioning: |
|
|
- Use bounding box data to size and place each cell proportionally. |
|
|
- Avoid text overlap — if bounding boxes would overlap, adjust table cell spans or widths. |
|
|
6. Before outputting: |
|
|
- Validate internally that the HTML is valid. |
|
|
- Confirm every PaddleOCR text appears in the table. |
|
|
- Confirm the table renders correctly in an iframe. |
|
|
|
|
|
FINAL OUTPUT REQUIREMENT: |
|
|
- Output ONLY the complete, valid HTML — no commentary, no extra text. |
|
|
""" |
|
|
|
|
|
async def do_request(): |
|
|
"""Asynchronous function to make the API call.""" |
|
|
response = await model.generate_content_async(prompt) |
|
|
|
|
|
text = response.text.strip() |
|
|
if text.startswith("```html"): |
|
|
text = text[7:] |
|
|
if text.endswith("```"): |
|
|
text = text[:-3] |
|
|
return text.strip() |
|
|
|
|
|
return await do_request() |
|
|
|
|
|
except Exception as e: |
|
|
error_message = f"An error occurred while generating the HTML structure with Gemini: {str(e)}" |
|
|
traceback.print_exc() |
|
|
return f"<html><body><h1>HTML Generation Error</h1><p>{html.escape(error_message)}</p></body></html>" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/translate_file_gemini_paddle", response_class=HTMLResponse) |
|
|
async def translate_document_paddle_ocr( |
|
|
target_language: str = Form(...), file: UploadFile = File(...) |
|
|
): |
|
|
""" |
|
|
Processes a document using a PaddleOCR-based pipeline: |
|
|
1. PaddleOCR extracts text and coordinates from the uploaded image. |
|
|
2. Gemini translates the extracted text in a single, robust batch call. |
|
|
3. Gemini uses the translated data to generate a final, layout-aware HTML. |
|
|
""" |
|
|
content_type = file.content_type |
|
|
if content_type not in ["image/png", "image/jpeg", "image/bmp", "image/tiff"]: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Unsupported file type. Please use PNG, JPG, BMP or TIFF.", |
|
|
) |
|
|
|
|
|
try: |
|
|
await file.seek(0) |
|
|
image_bytes = await file.read() |
|
|
if not image_bytes: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty.") |
|
|
|
|
|
|
|
|
print("***** Step 1: Starting PaddleOCR extraction ******") |
|
|
paddle_data = await extract_text_and_boxes_with_paddle(image_bytes) |
|
|
|
|
|
if not paddle_data: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="PaddleOCR could not extract any data from the image.", |
|
|
) |
|
|
print("***** Step 1 Done: Finished OCR extraction ******") |
|
|
|
|
|
|
|
|
print("***** Step 2: Starting translation with Gemini ******") |
|
|
translated_paddle_data = await translate_paddle_data_with_gemini( |
|
|
paddle_data, target_language |
|
|
) |
|
|
print("***** Step 2 Done: Finished translation ******") |
|
|
|
|
|
|
|
|
print("***** Step 3: Generating final HTML from PaddleOCR data via Gemini ******") |
|
|
final_html = await generate_html_from_paddle_ocr( |
|
|
translated_paddle_data |
|
|
) |
|
|
print("***** Step 3 Done: Generated final HTML ******") |
|
|
|
|
|
return HTMLResponse(content=final_html) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"An unexpected error occurred during processing: {str(e)}", |
|
|
) |
|
|
|
|
|
|
|
|
|