| """Background page metadata generator β extracts per-page descriptions from the full PDF.
|
|
|
| Uses parallel batch processing: the PDF is split into 5-page chunks and each
|
| chunk is sent to Gemini concurrently for faster metadata extraction.
|
| """
|
| from __future__ import annotations
|
|
|
| import json
|
| import logging
|
| import math
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
| from google import genai
|
| from google.genai import types
|
|
|
| from config import GOOGLE_API_KEY, METADATA_MODEL
|
| from prompts.metadata import METADATA_SYSTEM_PROMPT
|
| from tools.pdf_processor import extract_page_range_bytes
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| BATCH_SIZE = 5
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _extract_json_array(response_text: str) -> list[dict]:
|
| """Extract the outermost balanced JSON array from a response string."""
|
| start = response_text.find("[")
|
| if start == -1:
|
| raise ValueError("No JSON array found in metadata generation response")
|
|
|
| depth = 0
|
| end = None
|
| for i in range(start, len(response_text)):
|
| if response_text[i] == "[":
|
| depth += 1
|
| elif response_text[i] == "]":
|
| depth -= 1
|
| if depth == 0:
|
| end = i
|
| break
|
|
|
| if end is None:
|
| raise ValueError("No matching closing bracket found in metadata response")
|
|
|
| result = json.loads(response_text[start : end + 1])
|
| if not isinstance(result, list):
|
| raise ValueError(f"Expected list, got {type(result)}")
|
| return result
|
|
|
|
|
|
|
|
|
|
|
|
|
| def _generate_batch(
|
| pdf_path: str,
|
| page_start_0: int,
|
| page_end_0: int,
|
| page_start_1: int,
|
| page_end_1: int,
|
| ) -> list[dict]:
|
| """Generate metadata for a contiguous range of pages.
|
|
|
| Args:
|
| pdf_path: Path to the full PDF on disk.
|
| page_start_0: First page (0-indexed, inclusive) for PDF extraction.
|
| page_end_0: Last page (0-indexed, inclusive) for PDF extraction.
|
| page_start_1: First page (1-indexed) β used in the prompt text.
|
| page_end_1: Last page (1-indexed) β used in the prompt text.
|
|
|
| Returns:
|
| List of metadata dicts for the pages in this batch.
|
| """
|
| client = genai.Client(api_key=GOOGLE_API_KEY)
|
|
|
| batch_pdf_bytes = extract_page_range_bytes(pdf_path, page_start_0, page_end_0)
|
| pdf_part = types.Part.from_bytes(data=batch_pdf_bytes, mime_type="application/pdf")
|
|
|
| num_batch_pages = page_end_1 - page_start_1 + 1
|
| instruction_text = (
|
| f"This PDF excerpt contains {num_batch_pages} page(s), "
|
| f"corresponding to pages {page_start_1} through {page_end_1} of the full drawing set.\n"
|
| f"Generate structured metadata for ALL {num_batch_pages} page(s). "
|
| f"Use page numbers {page_start_1} through {page_end_1} (1-indexed). "
|
| f"Return a JSON array with exactly {num_batch_pages} objects."
|
| )
|
| instruction_part = types.Part.from_text(text=instruction_text)
|
|
|
| response = client.models.generate_content(
|
| model=METADATA_MODEL,
|
| contents=[types.Content(role="user", parts=[pdf_part, instruction_part])],
|
| config=types.GenerateContentConfig(
|
| system_instruction=METADATA_SYSTEM_PROMPT,
|
| ),
|
| )
|
|
|
| return _extract_json_array(response.text.strip())
|
|
|
|
|
|
|
|
|
|
|
|
|
| def generate_page_metadata(
|
| pdf_path: str,
|
| num_pages: int,
|
| progress_callback=None,
|
| ) -> list[dict]:
|
| """Extract per-page structured metadata from a PDF using parallel batches.
|
|
|
| The PDF is split into chunks of ``BATCH_SIZE`` pages. Each chunk is sent to
|
| Gemini concurrently via a thread pool. Results are merged, any missing
|
| pages are back-filled, and the list is returned sorted by page number.
|
|
|
| Args:
|
| pdf_path: Path to the full PDF.
|
| num_pages: Total number of pages.
|
| progress_callback: Optional ``(completed_batches, total_batches, page_range_str) -> None``
|
| called after each batch finishes.
|
|
|
| Returns a list of dicts (1-indexed page_num), one per page.
|
| Raises on failure (caller is responsible for error handling).
|
| """
|
| num_batches = math.ceil(num_pages / BATCH_SIZE)
|
| logger.info(
|
| "Starting parallel metadata generation: %d pages in %d batches of %d",
|
| num_pages, num_batches, BATCH_SIZE,
|
| )
|
|
|
| all_results: list[dict] = []
|
| errors: list[str] = []
|
| completed_count = 0
|
|
|
| with ThreadPoolExecutor(max_workers=num_batches) as executor:
|
| futures = {}
|
| for batch_idx in range(num_batches):
|
| page_start_0 = batch_idx * BATCH_SIZE
|
| page_end_0 = min(page_start_0 + BATCH_SIZE - 1, num_pages - 1)
|
| page_start_1 = page_start_0 + 1
|
| page_end_1 = page_end_0 + 1
|
|
|
| future = executor.submit(
|
| _generate_batch,
|
| pdf_path,
|
| page_start_0,
|
| page_end_0,
|
| page_start_1,
|
| page_end_1,
|
| )
|
| futures[future] = (page_start_1, page_end_1)
|
|
|
| for future in as_completed(futures):
|
| batch_range = futures[future]
|
| try:
|
| batch_results = future.result()
|
| all_results.extend(batch_results)
|
| completed_count += 1
|
| logger.info("Batch pages %d-%d complete: %d entries", batch_range[0], batch_range[1], len(batch_results))
|
| if progress_callback is not None:
|
| progress_callback(
|
| completed_count,
|
| num_batches,
|
| f"Pages {batch_range[0]}-{batch_range[1]}",
|
| )
|
| except Exception as e:
|
| completed_count += 1
|
| errors.append(f"Batch pages {batch_range[0]}-{batch_range[1]} failed: {e}")
|
| logger.exception("Batch pages %d-%d failed", batch_range[0], batch_range[1])
|
| if progress_callback is not None:
|
| progress_callback(
|
| completed_count,
|
| num_batches,
|
| f"Pages {batch_range[0]}-{batch_range[1]} (failed)",
|
| )
|
|
|
| if errors and not all_results:
|
| raise RuntimeError(
|
| f"All metadata batches failed:\n" + "\n".join(errors)
|
| )
|
|
|
| if errors:
|
| logger.warning("Some batches failed (results will have gaps): %s", errors)
|
|
|
|
|
|
|
|
|
|
|
|
|
| covered_pages = {item.get("page_num") for item in all_results}
|
| for p in range(1, num_pages + 1):
|
| if p not in covered_pages:
|
| all_results.append({
|
| "page_num": p,
|
| "sheet_id": "unknown",
|
| "sheet_title": "Unknown",
|
| "discipline": "other",
|
| "page_type": "other",
|
| "description": "Metadata not extracted for this page.",
|
| "key_elements": [],
|
| "spatial_coverage": "",
|
| })
|
|
|
|
|
| all_results.sort(key=lambda x: x.get("page_num", 0))
|
|
|
| return all_results
|
|
|