Spaces:
Sleeping
Sleeping
File size: 4,321 Bytes
fa8a62b 0c9378b fa8a62b 758b199 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import os
import requests
import fitz # PyMuPDF
from docx import Document
# import mailparser
from urllib.parse import urlparse
import re
# === Base Abstract Class ===
class DocumentLoader:
def __init__(self, source: str):
self.source = source
def extract(self):
raise NotImplementedError("This method should be implemented in child classes.")
# === PDF Loader (Handles Local + URL) ===
class PDFLoader(DocumentLoader):
def __init__(self, source: str):
super().__init__(source)
self.is_url = self.source.startswith("http")
def _download_pdf(self):
local_path = "temp_blob.pdf"
response = requests.get(self.source)
if response.status_code == 200:
with open(local_path, 'wb') as f:
f.write(response.content)
return local_path
else:
raise Exception(f"Failed to download PDF. Status: {response.status_code}")
def extract(self):
pdf_path = self._download_pdf() if self.is_url else self.source
doc = fitz.open(pdf_path)
clauses = []
current_heading = None
current_chunk = []
page_number = 0
heading_pattern = re.compile(r'^(\d+(\.\d+)*[\s\-:]?|[A-Z][A-Z\s]{4,})') # e.g., 3.1, 2.3.5, or UPPER TITLES
for page in doc:
page_number += 1
blocks = page.get_text("blocks")
blocks = sorted(blocks, key=lambda b: (b[1], b[0])) # Sort top-to-bottom, left-to-right
for b in blocks:
text = b[4].strip()
if not text or len(text) < 20:
continue
if heading_pattern.match(text):
# Flush previous chunk
if current_chunk:
combined = " ".join(current_chunk).strip()
clauses.append({
"heading": current_heading,
"text": combined,
"page": page_number
})
current_chunk = []
current_heading = text # New heading found
else:
current_chunk.append(text)
# Final chunk flush
if current_chunk:
combined = " ".join(current_chunk).strip()
clauses.append({
"heading": current_heading,
"text": combined,
"page": page_number
})
doc.close()
if self.is_url and os.path.exists(pdf_path):
os.remove(pdf_path)
return clauses
# === DOCX Loader ===
class DOCXLoader(DocumentLoader):
def extract(self):
doc = Document(self.source)
clauses = []
for i, para in enumerate(doc.paragraphs):
text = para.text.strip()
if text:
clauses.append({
"text": text,
"style": para.style.name,
"position": i + 1
})
return clauses
# === Email Loader (.eml files) ===
# class EmailLoader(DocumentLoader):
# def extract(self):
# mail = mailparser.parse_from_file(self.source)
# return [{
# "subject": mail.subject,
# "from": mail.from_[0][1] if mail.from_ else "",
# "to": mail.to[0][1] if mail.to else "",
# "text": mail.body,
# "date": str(mail.date)
# }]
# === Main Wrapper Function ===
def load_document(source: str):
parsed = urlparse(source)
if source.endswith(".pdf") or parsed.scheme.startswith("http"):
loader = PDFLoader(source)
elif source.endswith(".docx"):
loader = DOCXLoader(source)
# elif source.endswith(".eml"):
# loader = EmailLoader(source)
else:
raise ValueError("Unsupported file format or source type.")
content = loader.extract()
return {
"source": source,
"clauses": content
}
# if __name__ == '__main__':
# output = load_document('https://hackrx.blob.core.windows.net/assets/policy.pdf?sv=2023-01-03&st=2025-07-04T09%3A11%3A24Z&se=2027-07-05T09%3A11%3A00Z&sr=b&sp=r&sig=N4a9OU0w0QXO6AOIBiu4bpl7AXvEZogeT%2FjUHNO7HzQ%3D')
# print("hello")
# print(output['clauses'][4]) |