Gateston Johns
first real commit
9041389
raw
history blame
No virus
5.29 kB
from __future__ import annotations
import os
from typing import Iterable
import pymupdf
import re
from domain.domain_protocol import DomainProtocol
from domain.chunk_d import DocumentD, ChunkD
import proto.chunk_pb2 as chunk_pb2
from extraction_pipeline.base_stage import BaseStage, BaseTransform
from storage.domain_dao import CacheDomainDAO
class IDTagger():
_uuid_to_tag: CacheDomainDAO[ChunkD]
def __init__(self, cache_save_path: str):
self._uuid_to_tag = CacheDomainDAO(f"{cache_save_path}.json", ChunkD)
def __call__(self, element: ChunkD) -> ChunkD:
self._uuid_to_tag.insert([element])
return element
class PdfPageChunkerStage(BaseStage[DocumentD, ChunkD]):
def __init__(self):
self._pdf_cache_dao = CacheDomainDAO("pdf_chunk_id_map.json", DocumentD)
self.id_tagger = CacheDomainDAO(f"{str.lower((self.__class__.__name__))}_chunk_id_map.json",
ChunkD)
def _process_element(self, element: DocumentD) -> Iterable[ChunkD]:
pdf_document: Iterable = pymupdf.open(element.file_path) # type: ignore
for page_index, pdf_page in enumerate(pdf_document):
page_text: str = pdf_page.get_textpage().extractText() # type: ignore
yield self.id_tagger(
ChunkD(parent_reference=element,
chunk_text=page_text,
chunk_type=chunk_pb2.ChunkType.CHUNK_TYPE_PAGE,
chunk_index=page_index,
chunk_id=self._pdf_cache_dao.set(element)))
class ParagraphChunkerStage(BaseStage[ChunkD, ChunkD]):
def __init__(self):
self.id_tagger = CacheDomainDAO(f"{str.lower((self.__class__.__name__))}_chunk_id_map.json",
ChunkD)
def _process_element(self, element: ChunkD) -> Iterable[ChunkD]:
paragraphs = re.split(r'\n+', element.chunk_text)
paragraphs = [para.strip() for para in paragraphs if para.strip()]
for chunk_index, paragraph in enumerate(paragraphs):
yield self.id_tagger(
ChunkD(parent_reference=element.chunk_id,
chunk_text=paragraph,
chunk_type=chunk_pb2.ChunkType.CHUNK_TYPE_PARAGRAPH,
chunk_index=chunk_index))
class SentenceChunkerStage(BaseStage[ChunkD, ChunkD]):
def __init__(self):
self.id_tagger = CacheDomainDAO(f"{str.lower((self.__class__.__name__))}_chunk_id_map.json",
ChunkD)
def _process_element(self, element: ChunkD) -> Iterable[ChunkD]:
sentence_endings = r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!)(\s|$)'
sentences = re.split(sentence_endings, element.chunk_text)
sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
for chunk_index, sentence in enumerate(sentences):
yield self.id_tagger(
ChunkD(parent_reference=element.chunk_id,
chunk_text=sentence,
chunk_type=chunk_pb2.ChunkType.CHUNK_TYPE_SENTENCE,
chunk_index=chunk_index))
class PdfToSentencesTransform(BaseTransform[DocumentD, ChunkD]):
_pdf_page_chunker: PdfPageChunkerStage
_paragraph_chunker: ParagraphChunkerStage
_sentence_chunker: SentenceChunkerStage
def __init__(self):
self._pdf_page_chunker = PdfPageChunkerStage()
self._paragraph_chunker = ParagraphChunkerStage()
self._sentence_chunker = SentenceChunkerStage()
def _process_collection(self, collection: Iterable[DocumentD]) -> Iterable[ChunkD]:
for pdf_document in collection:
pdf_pages = self._pdf_page_chunker.process_element(pdf_document)
for pdf_page in pdf_pages:
paragraphs = self._paragraph_chunker.process_element(pdf_page)
for paragraph in paragraphs:
sentences = self._sentence_chunker.process_element(paragraph)
yield from sentences
class PdfToParagraphTransform(BaseTransform[DocumentD, ChunkD]):
_pdf_page_chunker: PdfPageChunkerStage
_paragraph_chunker: ParagraphChunkerStage
def __init__(self):
self._pdf_page_chunker = PdfPageChunkerStage()
self._paragraph_chunker = ParagraphChunkerStage()
def _process_collection(self, collection: Iterable[DocumentD]) -> Iterable[ChunkD]:
for pdf_document in collection:
pdf_pages = self._pdf_page_chunker.process_element(pdf_document)
for pdf_page in pdf_pages:
paragraphs = self._paragraph_chunker.process_element(pdf_page)
for paragraph in paragraphs:
yield paragraph
class PdfToPageTransform(BaseTransform[DocumentD, ChunkD]):
_pdf_page_chunker: PdfPageChunkerStage
def __init__(self):
self._pdf_page_chunker = PdfPageChunkerStage()
self._paragraph_chunker = ParagraphChunkerStage()
def _process_collection(self, collection: Iterable[DocumentD]) -> Iterable[ChunkD]:
for pdf_document in collection:
pdf_pages = self._pdf_page_chunker.process_element(pdf_document)
for pdf_page in pdf_pages:
yield pdf_page