import re import json from io import BytesIO from pathlib import Path from collections import Counter from typing import List, Dict import pdfplumber import tiktoken TOKENIZER = tiktoken.get_encoding("cl100k_base") CHUNK_SIZE = 512 CHUNK_OVERLAP = 80 def extract_raw_pages(pdf_bytes: bytes) -> List[Dict]: pages_data = [] table_settings = { "vertical_strategy": "lines_strict", "horizontal_strategy": "lines_strict", "intersection_tolerance": 10, "snap_tolerance": 5, "join_tolerance": 5, "edge_min_length": 10, "min_words_vertical": 3, "min_words_horizontal": 2, } with pdfplumber.open(BytesIO(pdf_bytes)) as pdf: for i, page in enumerate(pdf.pages): tables_on_page = page.find_tables(table_settings) table_bboxes = [t.bbox for t in tables_on_page] filtered_page = page for bbox in table_bboxes: filtered_page = filtered_page.filter( lambda obj, bb=bbox: not (bb[0] <= obj["x0"] <= bb[2] and bb[1] <= obj["top"] <= bb[3]) ) prose_text = filtered_page.extract_text() or "" extracted_tables = [t.extract() for t in tables_on_page if t.extract()] pages_data.append({ "page_num": i + 1, "raw_text": prose_text, "tables": extracted_tables, }) return pages_data SKIP_PAGE_PATTERNS = [ r"^\s*$", r"(?i)this\s+page\s+intentionally\s+left\s+blank", r"(?i)^(table\s+of\s+contents?|contents?)\s*$", r"(?i)forward.?looking", r"(?i)safe\s+harbor", ] def is_boilerplate_page(text: str) -> bool: if not text or not text.strip(): return True return any(re.search(pat, text) for pat in SKIP_PAGE_PATTERNS) def clean_text(text: str) -> str: if not text: return "" text = text.replace("\u2013", "-").replace("\u2014", "-") text = text.replace("\u2018", "'").replace("\u2019", "'") text = text.replace("\u201c", '"').replace("\u201d", '"') text = text.replace("\u2022", "-").replace("\u00b7", "-") text = text.replace("\u00a0", " ") text = re.sub(r'\b(k\s+no\s+wn|kno wn)\b', 'known', text, flags=re.IGNORECASE) text = re.sub(r"(?m)^[\s\-]*Page\s+\d+.*$", "", text, flags=re.IGNORECASE) text = re.sub(r"(?m)^\s*\d{1,4}\s*$", "", text) text = re.sub(r"(?m)^[\s\-=_\.\*]{4,}$", "", text) text = re.sub(r"-\n(\w)", r"\1", text) text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"(?= freq_threshold and len(line) < 120} for p in pages_data: p["clean_text"] = "\n".join( l for l in p["clean_text"].splitlines() if l.strip() not in repeated ) return pages_data def table_to_markdown(table_rows) -> str: if not table_rows: return "" rows = [[str(c).replace("\n", " ").strip() if c is not None else "" for c in row] for row in table_rows if any(str(c).strip() for c in row)] if not rows: return "" col_count = max(len(r) for r in rows) rows = [r + [""] * (col_count - len(r)) for r in rows] md = "| " + " | ".join(rows[0]) + " |\n" md += "| " + " | ".join(["---"] * col_count) + " |\n" for row in rows[1:]: md += "| " + " | ".join(row) + " |\n" return md def attach_tables_to_pages(pages_data: List[Dict]): for p in pages_data: table_blocks = [f"\n[Table {i} — Page {p['page_num']}]\n{table_to_markdown(tbl)}" for i, tbl in enumerate(p.get("tables", []), 1) if table_to_markdown(tbl)] if table_blocks: p["clean_text"] += "\n" + "\n".join(table_blocks) return pages_data SECTION_HEADING_RE = re.compile( r"(?m)^(?:[A-Z][A-Z0-9\s\-&,()]{5,80}[A-Z]$|\d{1,2}\.?\s+[A-Z][A-Za-z\s\-&,]{8,}|NOTE\s+-\s?\d+|Item No\.\s?\d+)", re.IGNORECASE ) def split_into_sections(pages_data: List[Dict]): all_lines = [(p["page_num"], line) for p in pages_data for line in p.get("clean_text", "").splitlines()] sections = [] current_title = "Preamble" current_pages = set() current_lines = [] for page_num, line in all_lines: current_pages.add(page_num) stripped = line.strip() if SECTION_HEADING_RE.match(stripped) and len(stripped) > 5: if current_lines: sections.append({ "section": current_title, "pages": sorted(current_pages), "text": "\n".join(current_lines).strip(), }) current_title = stripped current_pages = {page_num} current_lines = [] else: current_lines.append(line) if current_lines: sections.append({ "section": current_title, "pages": sorted(current_pages), "text": "\n".join(current_lines).strip(), }) return sections def chunk_text(text: str) -> List[str]: chunks = [] table_pattern = re.compile(r"(\[Table \d+ — Page \d+\]\n(?:\|.*\|\n)+)") blocks = table_pattern.split(text) for block in blocks: if not block.strip(): continue if block.startswith("[Table"): chunks.append(block.strip()) else: paragraphs = [p.strip() for p in re.split(r"\n{2,}", block) if p.strip()] current = [] current_tokens = 0 for para in paragraphs: para_tokens = len(TOKENIZER.encode(para)) if current_tokens + para_tokens > CHUNK_SIZE and current: chunks.append(" ".join(current)) overlap = " ".join(current).split()[-CHUNK_OVERLAP//2:] current = [" ".join(overlap)] current_tokens = len(TOKENIZER.encode(" ".join(current))) current.append(para) current_tokens += para_tokens if current: chunks.append(" ".join(current)) return [c.strip() for c in chunks if c.strip()] def build_rag_chunks(pages_data: List[Dict], filename: str) -> List[Dict]: sections = split_into_sections(pages_data) all_chunks = [] chunk_id = 0 doc_id = Path(filename).stem.replace(" ", "_").replace("(", "").replace(")", "").replace("[", "").replace("]", "") for sec in sections: text_chunks = chunk_text(sec["text"]) for idx, chunk in enumerate(text_chunks): all_chunks.append({ "chunk_id": chunk_id, "document": doc_id, "section": sec["section"][:120], "chunk_index": idx, "total_chunks": len(text_chunks), "pages": sec["pages"], "token_count": len(TOKENIZER.encode(chunk)), "text": chunk, "metadata": { "document": doc_id, "source_file": filename, "section": sec["section"], "pages": sec["pages"], } }) chunk_id += 1 return all_chunks def process_pdf_to_chunks(pdf_bytes: bytes, filename: str) -> List[Dict]: print(f"converting pdf to json {filename}") """Main processing function by using FastAPI""" raw_pages = extract_raw_pages(pdf_bytes) content_pages = [p for p in raw_pages if not is_boilerplate_page(p["raw_text"])] for p in content_pages: p["clean_text"] = clean_text(p["raw_text"]) content_pages = remove_repeated_headers_footers(content_pages) content_pages = attach_tables_to_pages(content_pages) return build_rag_chunks(content_pages, filename)