|
from unstructured.partition.auto import partition |
|
from unstructured.chunking.title import chunk_by_title |
|
from unstructured.chunking.basic import chunk_elements |
|
from unstructured.documents.elements import Element, Title, CompositeElement |
|
from unstructured.staging.base import convert_to_dataframe |
|
from typing import Type, List, Literal, Tuple |
|
|
|
from unstructured.cleaners.core import replace_unicode_quotes, clean_non_ascii_chars, clean_ordered_bullets, group_broken_paragraphs, replace_unicode_quotes, clean, clean_trailing_punctuation, remove_punctuation, bytes_string_to_string |
|
import gradio as gr |
|
import time |
|
import pandas as pd |
|
import re |
|
import gzip |
|
import pickle |
|
from pydantic import BaseModel, Field |
|
|
|
from tools.helper_functions import get_file_path_end, get_file_path_end_with_ext |
|
|
|
|
|
PandasDataFrame = Type[pd.DataFrame] |
|
|
|
|
|
|
|
pdf_partition_strat = "ocr_only" |
|
|
|
|
|
|
|
meta_keys_to_filter = ["file_directory", "filetype"] |
|
element_types_to_filter = ['UncategorizedText', 'Header'] |
|
|
|
|
|
|
|
|
|
bytes_to_string=False |
|
replace_quotes=True |
|
clean_non_ascii=False |
|
clean_ordered_list=True |
|
group_paragraphs=True |
|
trailing_punctuation=False |
|
all_punctuation=False |
|
clean_text=True |
|
extra_whitespace=True |
|
dashes=True |
|
bullets=True |
|
lowercase=False |
|
|
|
|
|
|
|
|
|
minimum_chunk_length = 2000 |
|
start_new_chunk_after_end_of_this_element_length = 2000 |
|
hard_max_character_length_chunks = 3000 |
|
multipage_sections=True |
|
overlap_all=True |
|
include_orig_elements=True |
|
|
|
|
|
class Document(BaseModel): |
|
"""Class for storing a piece of text and associated metadata. Implementation adapted from Langchain code: https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/documents/base.py""" |
|
|
|
page_content: str |
|
"""String text.""" |
|
metadata: dict = Field(default_factory=dict) |
|
"""Arbitrary metadata about the page content (e.g., source, relationships to other |
|
documents, etc.). |
|
""" |
|
type: Literal["Document"] = "Document" |
|
|
|
|
|
def create_title_id_dict(elements:List[Element]): |
|
|
|
|
|
titles = [item.text for item in elements if isinstance(item, Title)] |
|
|
|
|
|
chapter_ids = {} |
|
for element in elements: |
|
for chapter in titles: |
|
if element.text == chapter and element.category == "Title": |
|
chapter_ids[element._element_id] = chapter |
|
break |
|
|
|
chapter_to_id = {v: k for k, v in chapter_ids.items()} |
|
|
|
return chapter_ids, chapter_to_id |
|
|
|
|
|
def filter_elements(elements:List[Element], excluded_elements: List[str] = ['']): |
|
""" |
|
Filter out elements from a list based on their categories. |
|
|
|
Args: |
|
elements: The list of elements to filter. |
|
excluded_elements: A list of element categories to exclude. |
|
|
|
Returns: |
|
A new list containing the filtered elements. |
|
""" |
|
filtered_elements = [] |
|
for element in elements: |
|
if element.category not in excluded_elements: |
|
filtered_elements.append(element) |
|
return filtered_elements |
|
|
|
|
|
def remove_keys_from_meta( |
|
elements: List[Element], |
|
meta_remove_keys: List[str], |
|
excluded_element_types: List[str] = [] |
|
) -> List[Element]: |
|
''' |
|
Remove specified metadata keys from an Unstructured Element object |
|
''' |
|
|
|
for element in elements: |
|
if element.category not in excluded_element_types: |
|
for key in meta_remove_keys: |
|
try: |
|
del element.metadata.__dict__[key] |
|
except KeyError: |
|
print(f"Key '{key}' not found in element metadata.") |
|
|
|
return elements |
|
|
|
def filter_elements_and_metadata( |
|
elements: List[Element], |
|
excluded_categories: List[str] = [], |
|
meta_remove_keys: List[str] = [], |
|
) -> List[Element]: |
|
""" |
|
Filters elements based on categories and removes specified metadata keys. |
|
|
|
Args: |
|
elements: The list of elements to process. |
|
excluded_categories: A list of element categories to exclude. |
|
meta_remove_keys: A list of metadata keys to remove. |
|
|
|
Returns: |
|
A new list containing the processed elements. |
|
""" |
|
|
|
filtered_elements = [] |
|
for element in elements: |
|
if element.category not in excluded_categories: |
|
for key in meta_remove_keys: |
|
try: |
|
del element.metadata.__dict__[key] |
|
except KeyError: |
|
|
|
|
|
pass |
|
filtered_elements.append(element) |
|
|
|
return filtered_elements |
|
|
|
|
|
def add_parent_title_to_meta(elements:List[Element], chapter_ids:List[str], excluded_element_types:List[str]=['']) -> List[Element]: |
|
''' |
|
Add parent title to Unstructured metadata elements |
|
|
|
''' |
|
for element in elements: |
|
if element.category in excluded_element_types: |
|
pass |
|
|
|
else: |
|
meta = element.metadata.to_dict() |
|
|
|
if "parent_id" in meta and meta["parent_id"] in chapter_ids and "title_name" not in meta: |
|
title_name = chapter_ids[meta["parent_id"]] |
|
|
|
element.metadata.title_name = title_name |
|
|
|
return elements |
|
|
|
|
|
def chunk_all_elements(elements:List[Element], file_name_base:str, chunk_type:str = "Basic_chunking", minimum_chunk_length:int=minimum_chunk_length, start_new_chunk_after_end_of_this_element_length:int=start_new_chunk_after_end_of_this_element_length, hard_max_character_length_chunks:int=hard_max_character_length_chunks, multipage_sections:bool=multipage_sections, overlap_all:bool=overlap_all, include_orig_elements:bool=include_orig_elements): |
|
|
|
''' |
|
Use Unstructured.io functions to chunk an Element object by Title or across all elements. |
|
''' |
|
output_files = [] |
|
output_summary = "" |
|
|
|
chapter_ids, chapter_to_id = create_title_id_dict(elements) |
|
|
|
|
|
|
|
try: |
|
|
|
if chunk_type == "Chunk within title": |
|
chunks = chunk_by_title( |
|
elements, |
|
include_orig_elements=include_orig_elements, |
|
combine_text_under_n_chars=minimum_chunk_length, |
|
new_after_n_chars=start_new_chunk_after_end_of_this_element_length, |
|
max_characters=hard_max_character_length_chunks, |
|
multipage_sections=multipage_sections, |
|
overlap_all=overlap_all |
|
) |
|
|
|
else: |
|
chunks = chunk_elements( |
|
elements, |
|
include_orig_elements=include_orig_elements, |
|
new_after_n_chars=start_new_chunk_after_end_of_this_element_length, |
|
max_characters=hard_max_character_length_chunks, |
|
overlap_all=overlap_all |
|
) |
|
|
|
except Exception as output_summary: |
|
print(output_summary) |
|
return output_summary, output_files, file_name_base |
|
|
|
chunk_sections, chunk_df, chunks_out = element_chunks_to_document(chunks, chapter_ids) |
|
|
|
file_name_suffix = "_chunk" |
|
|
|
|
|
output_summary, output_files, file_name_base_new = export_elements_as_table_to_file(chunks_out, file_name_base, file_name_suffix, chunk_sections) |
|
|
|
return output_summary, output_files, file_name_base |
|
|
|
|
|
def element_chunks_to_document(chunks:CompositeElement, chapter_ids:List[str]) -> Tuple[List[Document], PandasDataFrame, List[str]]: |
|
''' |
|
Take an Unstructured.io chunk_by_title output with the original parsed document elements and turn it into a Document format commonly used by vector databases, and a Pandas dataframe. |
|
''' |
|
chunk_sections = [] |
|
current_title_id = '' |
|
current_title = '' |
|
last_page = '' |
|
chunk_df_list = [] |
|
|
|
for chunk in chunks: |
|
chunk_meta = chunk.metadata.to_dict() |
|
true_element_ids = [] |
|
element_categories = [] |
|
titles = [] |
|
titles_id = [] |
|
|
|
if "page_number" in chunk_meta: |
|
last_page = chunk_meta["page_number"] |
|
|
|
chunk_text = chunk.text |
|
|
|
|
|
|
|
for element in chunk.metadata.orig_elements: |
|
|
|
|
|
element_id = element._element_id |
|
element_category = element.category |
|
element_meta = element.metadata.to_dict() |
|
|
|
if "page_number" in element_meta: |
|
element_page_number = element_meta["page_number"] |
|
last_page = element_page_number |
|
|
|
true_element_ids.append(element_id) |
|
element_categories.append(element_category) |
|
|
|
|
|
|
|
if "page_number" in element_meta: |
|
chunk_meta["last_page_number"] = last_page |
|
|
|
chunk_meta["true_element_ids"] = true_element_ids |
|
|
|
for loop_id in chunk_meta['true_element_ids']: |
|
if loop_id in chapter_ids: |
|
current_title = chapter_ids[loop_id] |
|
current_title_id = loop_id |
|
|
|
titles.append(current_title) |
|
titles_id.append(current_title_id) |
|
|
|
chunk_meta['titles'] = titles |
|
chunk_meta['titles_id'] = titles_id |
|
|
|
|
|
chunk_meta.pop('orig_elements') |
|
|
|
chunk_dict_for_df = chunk_meta.copy() |
|
chunk_dict_for_df['text'] = chunk.text |
|
|
|
chunk_df_list.append(chunk_dict_for_df) |
|
|
|
|
|
chunk_doc = [Document(page_content=chunk_text, metadata=chunk_meta)] |
|
chunk_sections.extend(chunk_doc) |
|
|
|
|
|
chunk.metadata.__dict__ = chunk_meta |
|
|
|
chunk_df = pd.DataFrame(chunk_df_list) |
|
|
|
|
|
|
|
return chunk_sections, chunk_df, chunks |
|
|
|
|
|
def write_elements_to_documents(elements:List[Element]): |
|
''' |
|
Take Unstructured.io parsed elements and write it into a 'Document' format commonly used by vector databases |
|
''' |
|
|
|
doc_sections = [] |
|
|
|
for element in elements: |
|
meta = element.metadata.to_dict() |
|
|
|
meta["type"] = element.category |
|
meta["element_id"] = element._element_id |
|
|
|
element_doc = [Document(page_content=element.text, metadata= meta)] |
|
doc_sections.extend(element_doc) |
|
|
|
|
|
|
|
|
|
return doc_sections |
|
|
|
|
|
def clean_elements(elements:List[Element], dropdown_options: List[str] = [''], |
|
output_name:str = "combined_elements", |
|
bytes_to_string:bool=False, |
|
replace_quotes:bool=True, |
|
clean_non_ascii:bool=False, |
|
clean_ordered_list:bool=True, |
|
group_paragraphs:bool=True, |
|
trailing_punctuation:bool=False, |
|
all_punctuation:bool=False, |
|
clean_text:bool=True, |
|
extra_whitespace:bool=True, |
|
dashes:bool=True, |
|
bullets:bool=True, |
|
lowercase:bool=False) -> List[Element]: |
|
|
|
''' |
|
Apply Unstructured cleaning processes to a list of parse elements. |
|
''' |
|
|
|
out_files = [] |
|
output_summary = "" |
|
|
|
|
|
for option in dropdown_options: |
|
if option == "Convert bytes to string": |
|
bytes_to_string = True |
|
elif option == "Replace quotes": |
|
replace_quotes = True |
|
elif option == "Clean non ASCII": |
|
clean_non_ascii = True |
|
elif option == "Clean ordered list": |
|
clean_ordered_list = True |
|
elif option == "Group paragraphs": |
|
group_paragraphs = True |
|
elif option == "Remove trailing punctuation": |
|
trailing_punctuation = True |
|
elif option == "Remove all punctuation": |
|
all_punctuation = True |
|
elif option == "Clean text": |
|
clean_text = True |
|
elif option == "Remove extra whitespace": |
|
extra_whitespace = True |
|
elif option == "Remove dashes": |
|
dashes = True |
|
elif option == "Remove bullets": |
|
bullets = True |
|
elif option == "Make lowercase": |
|
lowercase = True |
|
|
|
|
|
cleaned_elements = elements.copy() |
|
|
|
for element in cleaned_elements: |
|
|
|
try: |
|
if element: |
|
if bytes_to_string: |
|
element.apply(bytes_string_to_string) |
|
if replace_quotes: |
|
element.apply(replace_unicode_quotes) |
|
if clean_non_ascii: |
|
element.apply(clean_non_ascii_chars) |
|
if clean_ordered_list: |
|
element.apply(clean_ordered_bullets) |
|
if group_paragraphs: |
|
element.apply(group_broken_paragraphs) |
|
if trailing_punctuation: |
|
element.apply(clean_trailing_punctuation) |
|
if all_punctuation: |
|
element.apply(remove_punctuation) |
|
if group_paragraphs: |
|
element.apply(group_broken_paragraphs) |
|
if clean_text: |
|
element.apply(lambda x: clean(x, extra_whitespace=extra_whitespace, dashes=dashes, bullets=bullets, lowercase=lowercase)) |
|
except Exception as e: |
|
print(e) |
|
element = element |
|
|
|
alt_out_message, out_files, output_file_base = export_elements_as_table_to_file(cleaned_elements, output_name, file_name_suffix="_clean") |
|
|
|
output_summary = "Text elements successfully cleaned." |
|
print(output_summary) |
|
|
|
return cleaned_elements, output_summary, out_files, output_file_base |
|
|
|
|
|
def export_elements_as_table_to_file(elements:List[Element], file_name_base:str, file_name_suffix:str="", chunk_documents:List[Document]=[]): |
|
''' |
|
Export elements as as a table. |
|
''' |
|
output_summary = "" |
|
out_files = [] |
|
|
|
|
|
out_table = convert_to_dataframe(elements) |
|
|
|
|
|
if file_name_suffix not in file_name_base: |
|
out_file_name_base = file_name_base + file_name_suffix |
|
|
|
else: |
|
out_file_name_base = file_name_base |
|
|
|
out_file_name = "output/" + out_file_name_base + ".csv" |
|
|
|
out_table.to_csv(out_file_name) |
|
out_files.append(out_file_name) |
|
|
|
|
|
if chunk_documents: |
|
out_documents = chunk_documents |
|
else: |
|
out_documents = write_elements_to_documents(elements) |
|
|
|
|
|
|
|
out_file_name_docs = "output/" + out_file_name_base + "_docs.pkl.gz" |
|
with gzip.open(out_file_name_docs, 'wb') as file: |
|
pickle.dump(out_documents, file) |
|
|
|
out_files.append(out_file_name_docs) |
|
|
|
output_summary = "File successfully exported." |
|
|
|
return output_summary, out_files, out_file_name_base |
|
|
|
|
|
|
|
def get_file_type(filename): |
|
pattern = r"\.(\w+)$" |
|
|
|
match = re.search(pattern, filename) |
|
if match: |
|
file_type = match.group(1) |
|
print(file_type) |
|
else: |
|
print("No file type found.") |
|
|
|
return file_type |
|
|
|
|
|
def partition_file(filenames:List[str], pdf_partition_strat:str = pdf_partition_strat, progress = gr.Progress()): |
|
''' |
|
Partition document files into text elements using the Unstructured package. Currently supports PDF, docx, pptx, html, several image file types, text document types, email messages, code files. |
|
''' |
|
|
|
out_message = "" |
|
combined_elements = [] |
|
out_files = [] |
|
|
|
for file in progress.tqdm(filenames, desc="Partitioning files", unit="files"): |
|
|
|
try: |
|
|
|
tic = time.perf_counter() |
|
print(file) |
|
|
|
file_name = get_file_path_end_with_ext(file) |
|
file_name_base = get_file_path_end(file) |
|
file_type = get_file_type(file_name) |
|
|
|
image_file_type_list = ["jpg", "jpeg", "png", "heic"] |
|
|
|
if file_type in image_file_type_list: |
|
print("File is an image. Using OCR method to partition.") |
|
file_elements = partition(file, strategy="ocr_only") |
|
else: |
|
file_elements = partition(file, strategy=pdf_partition_strat) |
|
|
|
toc = time.perf_counter() |
|
|
|
|
|
new_out_message = f"Successfully partitioned file: {file_name} in {toc - tic:0.1f} seconds\n" |
|
print(new_out_message) |
|
|
|
out_message = out_message + new_out_message |
|
combined_elements.extend(file_elements) |
|
|
|
except Exception as e: |
|
new_out_message = f"Failed to partition file: {file_name} due to {e}. Partitioning halted." |
|
print(new_out_message) |
|
out_message = out_message + new_out_message |
|
break |
|
|
|
out_table = convert_to_dataframe(combined_elements) |
|
|
|
|
|
if len(filenames) > 1: |
|
file_name_base = "combined_files" |
|
|
|
alt_out_message, out_files, output_file_base = export_elements_as_table_to_file(combined_elements, file_name_base, file_name_suffix="_elements") |
|
|
|
return out_message, combined_elements, out_files, output_file_base, out_table |
|
|
|
|
|
def modify_metadata_elements(elements_out_cleaned:List[Element], meta_keys_to_filter:List[str]=meta_keys_to_filter, element_types_to_filter:List[str]=element_types_to_filter) -> List[Element]: |
|
|
|
''' |
|
Take an element object, add parent title names to metadata. Remove specified metadata keys or element types from element list. |
|
''' |
|
|
|
chapter_ids, chapter_to_id = create_title_id_dict(elements_out_cleaned.copy()) |
|
elements_out_meta_mod = add_parent_title_to_meta(elements_out_cleaned.copy(), chapter_ids) |
|
elements_out_meta_mod_meta_filt = remove_keys_from_meta(elements_out_meta_mod.copy(), meta_keys_to_filter) |
|
elements_out_filtered_meta_mod = filter_elements(elements_out_meta_mod_meta_filt, element_types_to_filter) |
|
|
|
return elements_out_filtered_meta_mod |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|