Shubham170793's picture
Update src/ingestion.py
6241bc0 verified
import re
import fitz # PyMuPDF
import unicodedata
import os
import json
from gen_ai_hub.proxy.core.proxy_clients import get_proxy_client
from gen_ai_hub.proxy.langchain.openai import ChatOpenAI
# ==========================================================
# 1️⃣ TEXT EXTRACTION (Clean + TOC Detection)
# ==========================================================
def extract_text_from_pdf(file_path: str):
"""
Extracts and cleans text from a PDF using PyMuPDF.
Handles layout artifacts, numbered sections, and TOC.
Returns clean text + TOC list + source label.
English-only version.
"""
text = ""
try:
with fitz.open(file_path) as pdf:
for page_num, page in enumerate(pdf, start=1):
# Primary text extraction
page_text = page.get_text("text").strip()
# Fallback for PDFs with minimal text
if not page_text or len(page_text) < 10:
blocks = page.get_text("blocks")
page_text = " ".join(
block[4] for block in blocks if isinstance(block[4], str)
)
# Structural cleanup
page_text = page_text.replace("• ", "\n• ")
page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text)
page_text = re.sub(r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE)
page_text = re.sub(r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})", "", page_text, flags=re.IGNORECASE)
text += page_text + "\n"
except Exception as e:
raise RuntimeError(f"❌ PDF extraction failed: {e}")
# Clean text (English only)
text = clean_text(text)
print("🧾 TEXT SAMPLE (first 400 chars):", text[:400])
# TOC detection
toc, toc_source = get_hybrid_toc(text)
print(f"📘 TOC Source: {toc_source} | Entries: {len(toc)}")
return text, toc, toc_source
# ==========================================================
# 2️⃣ CLEANING PIPELINE (English Only)
# ==========================================================
def clean_text(text: str) -> str:
"""Cleans noisy PDF text for English documents."""
text = unicodedata.normalize("NFKC", text)
# Remove common TOC-like artifacts
text = re.sub(r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text)
# Normalize bullets and spacing
text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ")
text = re.sub(r"\.{3,}", ". ", text)
text = re.sub(r"-\s*\n", "", text)
text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text)
text = text.replace("\r", " ")
text = re.sub(r"\n{2,}", "\n", text)
text = re.sub(r"\s{2,}", " ", text)
# English-safe filter (no Devanagari)
text = re.sub(r"[^\w\s,;:.\-\(\)/&]", "", text)
text = re.sub(r"(\s*\.\s*){3,}", " ", text)
return text.strip()
# ==========================================================
# 3️⃣ TABLE OF CONTENTS DETECTION (Heuristic)
# ==========================================================
def extract_table_of_contents(text: str):
toc_entries = []
lines = text.split("\n")
toc_started = False
toc_ended = False
line_count = len(lines)
for i, line in enumerate(lines):
if not toc_started and re.search(r"\b(table\s*of\s*contents?|contents?|index|overview)\b", line, re.IGNORECASE):
next_lines = lines[i + 1: i + 8]
if any(re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", l) for l in next_lines):
toc_started = True
continue
if not toc_started and re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", line):
numbered_lines = 0
for j in range(i, min(i + 5, line_count)):
if re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", lines[j]):
numbered_lines += 1
if numbered_lines >= 3:
toc_started = True
if toc_started and re.match(r"^\s*(Step\s*\d+|[A-Z][a-z]{2,}\s[A-Z])", line):
toc_ended = True
break
if toc_started and not toc_ended:
match = re.match(
r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&(),-]+)(?:\.+\s*\d+)?$",
line.strip()
)
if match:
section = match.group(1).strip()
title = match.group(2).strip()
if len(title) > 3 and not re.match(r"^\d+$", title):
toc_entries.append((section, title))
deduped, seen = [], set()
for sec, title in toc_entries:
key = (sec, title.lower())
if key not in seen:
deduped.append((sec, title))
seen.add(key)
return deduped
# ==========================================================
# 3A️⃣ HYBRID TOC FALLBACK (AI-Inferred using SAP GenAI Hub Proxy)
# ==========================================================
def adaptive_fallback_toc(text: str, model_name: str = "gpt-4o"):
snippet = text[:7000]
creds_path = os.path.join(os.path.dirname(__file__), "GEN AI HUB PROXY.json")
creds = {}
base_url = ""
if os.path.exists(creds_path):
try:
with open(creds_path, "r") as f:
creds = json.load(f)
base_url = (
creds.get("base_url")
or creds.get("serviceurls", {}).get("AI_API_URL", "")
or creds.get("AICORE_BASE_URL", "")
)
except Exception as e:
print(f"⚠️ Could not read GenAI proxy credentials: {e}")
else:
print("⚠️ No SAP GenAI credentials file found — skipping AI fallback.")
return []
if not base_url:
print("⚠️ Missing AI_API_URL or base_url in credentials — skipping fallback.")
return []
os.environ.update({
"AICORE_AUTH_URL": creds.get("url", ""),
"AICORE_CLIENT_ID": creds.get("clientid") or creds.get("client_id", ""),
"AICORE_CLIENT_SECRET": creds.get("clientsecret") or creds.get("client_secret", ""),
"AICORE_RESOURCE_GROUP": "default",
"AICORE_BASE_URL": base_url
})
try:
print(f"⚙️ Invoking GenAI proxy for TOC inference using model: {model_name}")
proxy_client = get_proxy_client("gen-ai-hub", base_url=base_url)
llm = ChatOpenAI(proxy_model_name=model_name, proxy_client=proxy_client, temperature=0.0, max_tokens=700)
prompt = f"""
You are a document structure analyzer.
Read the following text and infer its main section titles.
Output a numbered list of 5–10 clean section names that could appear in a Table of Contents.
TEXT SAMPLE:
{snippet}
"""
response = llm.invoke(prompt)
response_text = getattr(response, "content", str(response))
lines = [
re.sub(r"^[0-9.\-•\s]+", "", l.strip())
for l in response_text.splitlines()
if l.strip()
]
toc_ai = [(str(i + 1), l) for i, l in enumerate(lines) if len(l) > 3]
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries (proxy-based).")
return toc_ai
except Exception as e:
print(f"⚠️ AI TOC fallback failed via GenAI proxy: {e}")
return []
# ==========================================================
# 3B️⃣ UNIFIED WRAPPER (Heuristic + AI Hybrid)
# ==========================================================
def get_hybrid_toc(text: str):
toc_entries = extract_table_of_contents(text)
if toc_entries:
print(f"📘 TOC detected with {len(toc_entries)} entries (heuristic).")
return toc_entries, "heuristic"
print("⚠️ No TOC detected — invoking GenAI fallback...")
toc_ai = adaptive_fallback_toc(text)
if toc_ai:
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries.")
return toc_ai, "ai_inferred"
print("❌ No TOC could be detected or inferred.")
return [], "none"
# ==========================================================
# 4️⃣ SMART CHUNKING (Section + Procedure Aware)
# ==========================================================
def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list:
text_length = len(text)
if chunk_size is None:
if text_length > 200000:
chunk_size, overlap = 2000, 250
elif text_length > 50000:
chunk_size, overlap = 1500, 200
else:
chunk_size, overlap = 1000, 150
elif overlap is None:
overlap = 150
print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})")
text = re.sub(r"\s+", " ", text.strip())
section_blocks = re.split(r"(?=(?:\s*\n|\s+)\d+(?:\.\d+){1,2}\s+[A-Z][A-Za-z].{0,80})", text)
procedure_blocks = []
for sec in section_blocks:
if not sec.strip():
continue
sub_blocks = re.split(
r"(?=(?:\s*\n|\s+)\d+\.\d+\s+(?:Create|Configure|Set\s*up|Setup|Steps?|Process|Procedure|Integration|Replication|Connection|Mapping|Restrictions?|Limitations?|Prerequisites?|Considerations?|Guidelines?|Notes?|Cautions?|Recommendations?)\b)",
sec, flags=re.IGNORECASE
)
procedure_blocks.extend(sub_blocks)
chunks = []
for block in procedure_blocks:
if not block.strip():
continue
if len(block) < chunk_size * 1.5:
chunks.append(block.strip())
else:
chunks.extend(_split_by_sentence(block, chunk_size, overlap))
chunks = _merge_small_chunks(chunks, min_len=200)
final_chunks = []
for i, ch in enumerate(chunks):
if i == 0:
final_chunks.append(ch)
else:
prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else ""
final_chunks.append((prev_tail + " " + ch).strip())
print(f"✅ Final chunks created: {len(final_chunks)}")
return final_chunks
# ==========================================================
# 🔹 Helper Functions
# ==========================================================
def _split_by_sentence(text, chunk_size=800, overlap=80):
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, current = [], ""
for sent in sentences:
if len(current) + len(sent) + 1 <= chunk_size:
current += " " + sent
else:
if current.strip():
chunks.append(current.strip())
overlap_part = current[-overlap:] if overlap > 0 else ""
current = overlap_part + " " + sent
if current.strip():
chunks.append(current.strip())
return chunks
def _merge_small_chunks(chunks, min_len=150):
merged, buffer = [], ""
for ch in chunks:
if len(ch) < min_len:
buffer += " " + ch
else:
if buffer:
merged.append(buffer.strip())
buffer = ""
merged.append(ch.strip())
if buffer:
merged.append(buffer.strip())
return merged
# ==========================================================
# 5️⃣ DEBUGGING (Manual Test)
# ==========================================================
if __name__ == "__main__":
pdf_path = "sample_ai_resume_structured.pdf"
text, toc, toc_source = extract_text_from_pdf(pdf_path)
print("\n📚 TOC Preview:", toc[:5])
chunks = chunk_text(text)
print(f"\n✅ {len(chunks)} chunks created.")