Tanishq Salkar
initial visual mapping code added to hf
db81e28
import os
import json
import asyncio
import requests
import fitz
import shutil
import tempfile
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException
import config
import utils_geometry as utils
from engine_vision import process_page_smart
from engine_mapping import map_fields_to_schema
from utils_grouping import group_fields_by_section
app = FastAPI(title="Smart Contract Processor API")
# code just to create a new commit
def get_fields_from_local_api(pdf_path):
"""
Sends the PDF to the local model_api to get Bounding Boxes i.e neon green boxes surrounding the fields.
Identical logic to main.py, just adapted to take a specific path.
"""
print(f"Sending to Model API: {config.COMMON_FORMS_API_URL}")
fields_by_page = {}
try:
with open(pdf_path, 'rb') as f:
response = requests.post(
config.COMMON_FORMS_API_URL,
files={'file': f},
stream=True,
timeout=60
)
for line in response.iter_lines():
if not line: continue
data = json.loads(line)
if data.get("status") == "success":
fields_by_page[data["page"]] = data.get("fields", [])
elif data.get("status") == "error":
print(f"Model API Error on page {data.get('page')}: {data.get('msg')}")
except Exception as e:
print(f"API Connection Error: {e}")
return None
return fields_by_page
def get_pdf_metadata(doc, filename: str):
"""
Extract PDF metadata including page sizes for ClaiPDFCollection.
"""
page_sizes = []
for page in doc:
rect = page.rect
page_sizes.append({
"rotation": page.rotation,
"width": rect.width,
"height": rect.height
})
# Get title from PDF metadata or use filename
pdf_title = doc.metadata.get("title", "") if doc.metadata else ""
if not pdf_title:
pdf_title = os.path.splitext(filename)[0] if filename else "Document"
return {
"name": filename or "document.pdf",
"title": pdf_title,
"pageSizes": page_sizes
}
def resolve_intermediate_format(all_fields, pdf_metadata):
"""
Returns an intermediate format that will be transformed to ClaiSchema
in the Next.js layer. Uses tempIds for internal reference.
This format is consumed by transform-to-clai-schema.ts which generates
proper ClaiSchema-compliant IDs using TypeScript utilities.
"""
groups, updated_fields = group_fields_by_section(all_fields)
participants = {}
final_fields = []
routing_counter = 1
for f in updated_fields:
raw_role = str(f.get("role", "System")).strip().title()
participant_temp_id = None
if raw_role.lower() not in ["system", "n/a", "unknown", "none", ""]:
participant_temp_id = f"part_{raw_role.lower().replace(' ', '_')}"
if participant_temp_id not in participants:
participants[participant_temp_id] = {
"tempId": participant_temp_id,
"role": "signer",
"type": "unknown",
"label": raw_role,
"routingOrder": routing_counter,
"definer": "PREPARER"
}
routing_counter += 1
final_fields.append({
"tempId": f["id"],
"aliasId": f.get("aliasId"),
"groupTempId": f.get("groupId"),
"participantTempId": participant_temp_id,
"label": f["label"],
"semanticType": f["semanticType"],
"isDynamic": f.get("isDynamic", False),
"page": f["page"],
"rect": f["rect"]
})
# Transform groups to use tempId
groups_with_temp_ids = []
for g in groups:
groups_with_temp_ids.append({
"tempId": g["id"],
"title": g["title"],
"fieldTempIds": g["fieldIds"]
})
return {
"participants": list(participants.values()),
"groups": groups_with_temp_ids,
"fields": final_fields,
"pdfMetadata": pdf_metadata
}
# ==============================================================================
# API ENDPOINT (Replaces async main())
# ==============================================================================
@app.post("/process-pdf")
async def process_pdf(file: UploadFile = File(...)):
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
shutil.copyfileobj(file.file, tmp)
tmp_path = tmp.name
doc = None
try:
utils.setup_debug_dir()
print(f"Starting process for uploaded file: {file.filename}")
raw_fields = await asyncio.to_thread(get_fields_from_local_api, tmp_path)
if not raw_fields:
raise HTTPException(status_code=500, detail="Failed to extract fields from Model API (Local Port 8000).")
doc = fitz.open(tmp_path)
# Extract PDF metadata for ClaiPDFCollection
pdf_metadata = get_pdf_metadata(doc, file.filename)
# Extract text context for vision processing
text_sample = ""
for i in range(min(2, len(doc))):
text_sample += doc[i].get_text()
global_ctx = " ".join(text_sample.split())[:1500]
# Process pages with vision and mapping
semaphore = asyncio.Semaphore(config.MAX_CONCURRENT_PAGES)
tasks = []
for page_num, fields in raw_fields.items():
tasks.append(process_page_smart(semaphore, doc, page_num, fields, global_ctx))
results = await asyncio.gather(*tasks)
flat_results = [item for sublist in results for item in sublist]
mapped_results = await map_fields_to_schema(flat_results)
# Return intermediate format for Next.js transformation
intermediate_response = resolve_intermediate_format(mapped_results, pdf_metadata)
return intermediate_response
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
finally:
if doc:
doc.close()
if os.path.exists(tmp_path):
os.remove(tmp_path)
print(f"Cleanup complete for {tmp_path}")