Spaces:
Running
Running
| import json | |
| import os | |
| import base64 | |
| from io import BytesIO | |
| from typing import List, Dict, Any | |
| from docling.document_converter import DocumentConverter, PdfFormatOption | |
| from docling.datamodel.base_models import InputFormat | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions, PictureDescriptionApiOptions | |
| from docling_core.types.doc.labels import DocItemLabel | |
| from docling_core.types.doc.document import SectionHeaderItem, TitleItem | |
| from config import GROQ_API_KEY | |
| from docling.chunking import HybridChunker | |
| from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions | |
| from docling.datamodel.settings import settings | |
| from docling.datamodel.pipeline_options import ( | |
| PdfPipelineOptions, | |
| OcrAutoOptions | |
| ) | |
| class EnrichedRagParser: | |
| """ | |
| Parser using Docling's HybridChunker for Multimodal RAG. | |
| Modified from sonnet_export.py for modular use. | |
| """ | |
| def __init__(self, groq_api_key: str = GROQ_API_KEY): | |
| self.groq_api_key = groq_api_key | |
| self.converter = self._setup_converter() | |
| self.chunker = HybridChunker(merge_peers=True) | |
| def _setup_converter(self) -> DocumentConverter: | |
| # CPU Configuration | |
| accelerator_options = AcceleratorOptions( | |
| num_threads=min(12, os.cpu_count()), | |
| device=AcceleratorDevice.CPU | |
| ) | |
| # Smart OCR Configuration | |
| # Only triggers when >50% of page is scanned/bitmap content | |
| ocr_options = OcrAutoOptions( | |
| lang=["en"], # ✅ Specify language | |
| force_full_page_ocr=False, # ⚡ Don't force OCR on all pages | |
| bitmap_area_threshold=0.5 # ⚡ Smart: Only OCR if >50% scanned | |
| ) | |
| # Pipeline Configuration | |
| pipeline_options = PdfPipelineOptions( | |
| # Features | |
| do_ocr=True, # Enable OCR (but smart triggering) | |
| do_table_structure=True, | |
| generate_picture_images=True, | |
| images_scale=1, | |
| ocr_options=ocr_options, # ⚡ Smart OCR config | |
| # Disable unnecessary features | |
| generate_page_images=False, | |
| enable_remote_services=True, | |
| # Picture descriptions - using VLM (local) | |
| do_picture_description=True, | |
| # Resource management | |
| queue_max_size=10, | |
| document_timeout=300.0 | |
| ) | |
| pipeline_options.accelerator_options = accelerator_options | |
| settings.debug.profile_pipeline_timings = True | |
| pipeline_options.picture_description_options = PictureDescriptionApiOptions( | |
| url="https://api.groq.com/openai/v1/chat/completions", | |
| params={ | |
| "model": "meta-llama/llama-4-scout-17b-16e-instruct", # Double check this model string | |
| "temperature": 0.2, | |
| "max_tokens": 500, | |
| }, | |
| prompt="Describe this image in detail for a RAG knowledge base. Include all visible text, numbers, and chart trends.", | |
| headers={"Authorization": f"Bearer {self.groq_api_key}"} | |
| ) | |
| return DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) | |
| } | |
| ) | |
| def _determine_chunk_type(self, chunk) -> str: | |
| chunk_type = "text" | |
| if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items: | |
| labels = [item.label for item in chunk.meta.doc_items] | |
| if DocItemLabel.TABLE in labels: | |
| chunk_type = "table" | |
| elif DocItemLabel.LIST_ITEM in labels: | |
| chunk_type = "list" | |
| elif any(l in [DocItemLabel.TITLE, DocItemLabel.SECTION_HEADER] for l in labels): | |
| chunk_type = "header" | |
| elif DocItemLabel.CODE in labels: | |
| chunk_type = "code" | |
| return chunk_type | |
| def _get_base64_image(self, pic) -> str: | |
| try: | |
| if hasattr(pic, "image") and pic.image and hasattr(pic.image, "pil_image"): | |
| img = pic.image.pil_image | |
| if img: | |
| buffered = BytesIO() | |
| if img.mode != "RGB": | |
| img = img.convert("RGB") | |
| img.save(buffered, format="PNG") | |
| return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| except Exception as e: | |
| print(f"Failed to convert image to base64: {e}") | |
| return "" | |
| def _find_image_heading(self, doc, pic_item) -> str: | |
| current_heading = "Unknown" | |
| for item, level in doc.iterate_items(): | |
| if isinstance(item, (SectionHeaderItem, TitleItem)): | |
| if hasattr(item, 'text'): | |
| current_heading = item.text | |
| if item == pic_item: | |
| return current_heading | |
| return current_heading | |
| def process_document(self, file_path: str, save_json: bool = True, output_dir: str = "rag_data", max_page: int = 10) -> Dict[str, Any]: | |
| """Converts document and returns structured data.""" | |
| print(f"Testing Docling Parser on: {file_path}...") | |
| result = self.converter.convert(file_path) | |
| doc = result.document | |
| doc_conversion_secs = result.timings["pipeline_total"].times | |
| print(f"Doc conversion time: {doc_conversion_secs} seconds") | |
| chunk_iter = self.chunker.chunk(dl_doc=doc) | |
| structured_chunks = [] | |
| for i, chunk in enumerate(chunk_iter): | |
| heading = chunk.meta.headings[0] if chunk.meta.headings else "Unknown" | |
| page_num = 0 | |
| if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items: | |
| for item in chunk.meta.doc_items: | |
| if hasattr(item, "prov") and item.prov: | |
| if len(item.prov) > 0 and hasattr(item.prov[0], "page_no"): | |
| page_num = item.prov[0].page_no | |
| break | |
| structured_chunks.append({ | |
| "chunk_id": f"chunk_{i}", | |
| "type": self._determine_chunk_type(chunk), | |
| "text": chunk.text, | |
| "metadata": { | |
| "source": os.path.basename(file_path), | |
| "page_number": page_num, | |
| "section_header": heading | |
| } | |
| }) | |
| images_data = [] | |
| for i, pic in enumerate(doc.pictures): | |
| description = "No description" | |
| if hasattr(pic, "meta") and pic.meta and hasattr(pic.meta, "description"): | |
| desc_obj = pic.meta.description | |
| description = desc_obj.text if hasattr(desc_obj, "text") else str(desc_obj) | |
| images_data.append({ | |
| "image_id": f"img_{i}", | |
| "description": description, | |
| "page_number": pic.prov[0].page_no if pic.prov else 0, | |
| "section_header": self._find_image_heading(doc, pic), | |
| "image_base64": self._get_base64_image(pic) | |
| }) | |
| final_output = {"chunks": structured_chunks, "images": images_data} | |
| if save_json: | |
| os.makedirs(output_dir, exist_ok=True) | |
| with open(os.path.join(output_dir, "parsed_knowledge.json"), "w", encoding="utf-8") as f: | |
| json.dump(final_output, f, indent=2, ensure_ascii=False) | |
| print(f"Saved parsed knowledge to {output_dir}/parsed_knowledge.json") | |
| return final_output | |