|
|
import re |
|
|
import fitz |
|
|
import unicodedata |
|
|
import os |
|
|
import json |
|
|
from gen_ai_hub.proxy.core.proxy_clients import get_proxy_client |
|
|
from gen_ai_hub.proxy.langchain.openai import ChatOpenAI |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path: str): |
|
|
""" |
|
|
Extracts and cleans text from a PDF using PyMuPDF. |
|
|
Handles layout artifacts, numbered sections, and TOC. |
|
|
Returns clean text + TOC list + source label. |
|
|
English-only version. |
|
|
""" |
|
|
text = "" |
|
|
try: |
|
|
with fitz.open(file_path) as pdf: |
|
|
for page_num, page in enumerate(pdf, start=1): |
|
|
|
|
|
page_text = page.get_text("text").strip() |
|
|
|
|
|
|
|
|
if not page_text or len(page_text) < 10: |
|
|
blocks = page.get_text("blocks") |
|
|
page_text = " ".join( |
|
|
block[4] for block in blocks if isinstance(block[4], str) |
|
|
) |
|
|
|
|
|
|
|
|
page_text = page_text.replace("• ", "\n• ") |
|
|
page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text) |
|
|
page_text = re.sub(r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE) |
|
|
page_text = re.sub(r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})", "", page_text, flags=re.IGNORECASE) |
|
|
|
|
|
text += page_text + "\n" |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ PDF extraction failed: {e}") |
|
|
|
|
|
|
|
|
text = clean_text(text) |
|
|
|
|
|
print("🧾 TEXT SAMPLE (first 400 chars):", text[:400]) |
|
|
|
|
|
|
|
|
toc, toc_source = get_hybrid_toc(text) |
|
|
print(f"📘 TOC Source: {toc_source} | Entries: {len(toc)}") |
|
|
|
|
|
return text, toc, toc_source |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
"""Cleans noisy PDF text for English documents.""" |
|
|
text = unicodedata.normalize("NFKC", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text) |
|
|
|
|
|
|
|
|
text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ") |
|
|
text = re.sub(r"\.{3,}", ". ", text) |
|
|
text = re.sub(r"-\s*\n", "", text) |
|
|
text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text) |
|
|
text = text.replace("\r", " ") |
|
|
text = re.sub(r"\n{2,}", "\n", text) |
|
|
text = re.sub(r"\s{2,}", " ", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"[^\w\s,;:.\-\(\)/&]", "", text) |
|
|
|
|
|
text = re.sub(r"(\s*\.\s*){3,}", " ", text) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_table_of_contents(text: str): |
|
|
toc_entries = [] |
|
|
lines = text.split("\n") |
|
|
toc_started = False |
|
|
toc_ended = False |
|
|
line_count = len(lines) |
|
|
|
|
|
for i, line in enumerate(lines): |
|
|
if not toc_started and re.search(r"\b(table\s*of\s*contents?|contents?|index|overview)\b", line, re.IGNORECASE): |
|
|
next_lines = lines[i + 1: i + 8] |
|
|
if any(re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", l) for l in next_lines): |
|
|
toc_started = True |
|
|
continue |
|
|
|
|
|
if not toc_started and re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", line): |
|
|
numbered_lines = 0 |
|
|
for j in range(i, min(i + 5, line_count)): |
|
|
if re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", lines[j]): |
|
|
numbered_lines += 1 |
|
|
if numbered_lines >= 3: |
|
|
toc_started = True |
|
|
|
|
|
if toc_started and re.match(r"^\s*(Step\s*\d+|[A-Z][a-z]{2,}\s[A-Z])", line): |
|
|
toc_ended = True |
|
|
break |
|
|
|
|
|
if toc_started and not toc_ended: |
|
|
match = re.match( |
|
|
r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&(),-]+)(?:\.+\s*\d+)?$", |
|
|
line.strip() |
|
|
) |
|
|
if match: |
|
|
section = match.group(1).strip() |
|
|
title = match.group(2).strip() |
|
|
if len(title) > 3 and not re.match(r"^\d+$", title): |
|
|
toc_entries.append((section, title)) |
|
|
|
|
|
deduped, seen = [], set() |
|
|
for sec, title in toc_entries: |
|
|
key = (sec, title.lower()) |
|
|
if key not in seen: |
|
|
deduped.append((sec, title)) |
|
|
seen.add(key) |
|
|
return deduped |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def adaptive_fallback_toc(text: str, model_name: str = "gpt-4o"): |
|
|
snippet = text[:7000] |
|
|
creds_path = os.path.join(os.path.dirname(__file__), "GEN AI HUB PROXY.json") |
|
|
creds = {} |
|
|
base_url = "" |
|
|
|
|
|
if os.path.exists(creds_path): |
|
|
try: |
|
|
with open(creds_path, "r") as f: |
|
|
creds = json.load(f) |
|
|
base_url = ( |
|
|
creds.get("base_url") |
|
|
or creds.get("serviceurls", {}).get("AI_API_URL", "") |
|
|
or creds.get("AICORE_BASE_URL", "") |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not read GenAI proxy credentials: {e}") |
|
|
else: |
|
|
print("⚠️ No SAP GenAI credentials file found — skipping AI fallback.") |
|
|
return [] |
|
|
|
|
|
if not base_url: |
|
|
print("⚠️ Missing AI_API_URL or base_url in credentials — skipping fallback.") |
|
|
return [] |
|
|
|
|
|
os.environ.update({ |
|
|
"AICORE_AUTH_URL": creds.get("url", ""), |
|
|
"AICORE_CLIENT_ID": creds.get("clientid") or creds.get("client_id", ""), |
|
|
"AICORE_CLIENT_SECRET": creds.get("clientsecret") or creds.get("client_secret", ""), |
|
|
"AICORE_RESOURCE_GROUP": "default", |
|
|
"AICORE_BASE_URL": base_url |
|
|
}) |
|
|
|
|
|
try: |
|
|
print(f"⚙️ Invoking GenAI proxy for TOC inference using model: {model_name}") |
|
|
proxy_client = get_proxy_client("gen-ai-hub", base_url=base_url) |
|
|
llm = ChatOpenAI(proxy_model_name=model_name, proxy_client=proxy_client, temperature=0.0, max_tokens=700) |
|
|
|
|
|
prompt = f""" |
|
|
You are a document structure analyzer. |
|
|
Read the following text and infer its main section titles. |
|
|
Output a numbered list of 5–10 clean section names that could appear in a Table of Contents. |
|
|
|
|
|
TEXT SAMPLE: |
|
|
{snippet} |
|
|
""" |
|
|
|
|
|
response = llm.invoke(prompt) |
|
|
response_text = getattr(response, "content", str(response)) |
|
|
lines = [ |
|
|
re.sub(r"^[0-9.\-•\s]+", "", l.strip()) |
|
|
for l in response_text.splitlines() |
|
|
if l.strip() |
|
|
] |
|
|
|
|
|
toc_ai = [(str(i + 1), l) for i, l in enumerate(lines) if len(l) > 3] |
|
|
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries (proxy-based).") |
|
|
return toc_ai |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ AI TOC fallback failed via GenAI proxy: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_hybrid_toc(text: str): |
|
|
toc_entries = extract_table_of_contents(text) |
|
|
if toc_entries: |
|
|
print(f"📘 TOC detected with {len(toc_entries)} entries (heuristic).") |
|
|
return toc_entries, "heuristic" |
|
|
|
|
|
print("⚠️ No TOC detected — invoking GenAI fallback...") |
|
|
toc_ai = adaptive_fallback_toc(text) |
|
|
if toc_ai: |
|
|
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries.") |
|
|
return toc_ai, "ai_inferred" |
|
|
|
|
|
print("❌ No TOC could be detected or inferred.") |
|
|
return [], "none" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list: |
|
|
text_length = len(text) |
|
|
if chunk_size is None: |
|
|
if text_length > 200000: |
|
|
chunk_size, overlap = 2000, 250 |
|
|
elif text_length > 50000: |
|
|
chunk_size, overlap = 1500, 200 |
|
|
else: |
|
|
chunk_size, overlap = 1000, 150 |
|
|
elif overlap is None: |
|
|
overlap = 150 |
|
|
|
|
|
print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})") |
|
|
text = re.sub(r"\s+", " ", text.strip()) |
|
|
|
|
|
section_blocks = re.split(r"(?=(?:\s*\n|\s+)\d+(?:\.\d+){1,2}\s+[A-Z][A-Za-z].{0,80})", text) |
|
|
|
|
|
procedure_blocks = [] |
|
|
for sec in section_blocks: |
|
|
if not sec.strip(): |
|
|
continue |
|
|
sub_blocks = re.split( |
|
|
r"(?=(?:\s*\n|\s+)\d+\.\d+\s+(?:Create|Configure|Set\s*up|Setup|Steps?|Process|Procedure|Integration|Replication|Connection|Mapping|Restrictions?|Limitations?|Prerequisites?|Considerations?|Guidelines?|Notes?|Cautions?|Recommendations?)\b)", |
|
|
sec, flags=re.IGNORECASE |
|
|
) |
|
|
procedure_blocks.extend(sub_blocks) |
|
|
|
|
|
chunks = [] |
|
|
for block in procedure_blocks: |
|
|
if not block.strip(): |
|
|
continue |
|
|
if len(block) < chunk_size * 1.5: |
|
|
chunks.append(block.strip()) |
|
|
else: |
|
|
chunks.extend(_split_by_sentence(block, chunk_size, overlap)) |
|
|
|
|
|
chunks = _merge_small_chunks(chunks, min_len=200) |
|
|
final_chunks = [] |
|
|
for i, ch in enumerate(chunks): |
|
|
if i == 0: |
|
|
final_chunks.append(ch) |
|
|
else: |
|
|
prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else "" |
|
|
final_chunks.append((prev_tail + " " + ch).strip()) |
|
|
|
|
|
print(f"✅ Final chunks created: {len(final_chunks)}") |
|
|
return final_chunks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _split_by_sentence(text, chunk_size=800, overlap=80): |
|
|
sentences = re.split(r"(?<=[.!?])\s+", text) |
|
|
chunks, current = [], "" |
|
|
for sent in sentences: |
|
|
if len(current) + len(sent) + 1 <= chunk_size: |
|
|
current += " " + sent |
|
|
else: |
|
|
if current.strip(): |
|
|
chunks.append(current.strip()) |
|
|
overlap_part = current[-overlap:] if overlap > 0 else "" |
|
|
current = overlap_part + " " + sent |
|
|
if current.strip(): |
|
|
chunks.append(current.strip()) |
|
|
return chunks |
|
|
|
|
|
def _merge_small_chunks(chunks, min_len=150): |
|
|
merged, buffer = [], "" |
|
|
for ch in chunks: |
|
|
if len(ch) < min_len: |
|
|
buffer += " " + ch |
|
|
else: |
|
|
if buffer: |
|
|
merged.append(buffer.strip()) |
|
|
buffer = "" |
|
|
merged.append(ch.strip()) |
|
|
if buffer: |
|
|
merged.append(buffer.strip()) |
|
|
return merged |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
pdf_path = "sample_ai_resume_structured.pdf" |
|
|
text, toc, toc_source = extract_text_from_pdf(pdf_path) |
|
|
print("\n📚 TOC Preview:", toc[:5]) |
|
|
chunks = chunk_text(text) |
|
|
print(f"\n✅ {len(chunks)} chunks created.") |
|
|
|