"""Unstructured file reader. A parser for unstructured text files using Unstructured.io. Supports .txt, .docx, .pptx, .jpg, .png, .eml, .html, and .pdf documents. """ from datetime import datetime import mimetypes import os from pathlib import Path import re from typing import Any, Dict, List, Optional from llama_index.readers.base import BaseReader from llama_index.readers.schema.base import Document class UnstructuredReader(BaseReader): """General unstructured text reader for a variety of files.""" def __init__(self, *args: Any, **kwargs: Any) -> None: """Init params.""" super().__init__(*args, **kwargs) # Prerequisite for Unstructured.io to work import nltk nltk.download("punkt") nltk.download("averaged_perceptron_tagger") def load_data( self, file: Path, extra_info: Optional[Dict] = None, split_documents: Optional[bool] = True, ) -> List[Document]: """Parse file.""" from unstructured.partition.auto import partition elements = partition(str(file)) text_chunks = [" ".join(str(el).split()) for el in elements] if split_documents: return [ Document(text=chunk, extra_info=extra_info or {}) for chunk in text_chunks ] else: return [ Document(text="\n\n".join(text_chunks), extra_info=extra_info or {}) ] class MarkdownReader(BaseReader): """General unstructured text reader for a variety of files.""" def __init__(self, *args: Any, **kwargs: Any) -> None: """Init params.""" super().__init__(*args, **kwargs) def load_data( self, file: Path, extra_info: Optional[Dict] = None, split_documents: Optional[bool] = True, ) -> List[Document]: """Parse file.""" from unstructured.partition.auto import partition elements = parse_knowledge_units(str(file)) if split_documents: return [ Document(text=ele, extra_info=extra_info or {}) for ele in elements ] def parse_knowledge_units(file_path): with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() knowledge_units = [] current_unit = "" unit_start_pattern = re.compile(r'^\d+\.\s') for line in lines: stripped_line = line.strip() if unit_start_pattern.match(stripped_line): if current_unit: knowledge_units.append(current_unit.strip()) current_unit = "" current_unit += line else: current_unit += line if current_unit: knowledge_units.append(current_unit.strip()) # for line in lines: # if line.strip() and line[0].isdigit() and '.' in line: # if current_unit: # knowledge_units.append(current_unit.strip()) # current_unit = "" # current_unit += line # else: # current_unit += line # if current_unit: # knowledge_units.append(current_unit.strip()) return knowledge_units def default_file_metadata_func(file_path: str) -> Dict: """Get some handy metadate from filesystem. Args: file_path: str: file path in str """ return { "file_path": file_path, "file_name": os.path.basename(file_path), "file_type": mimetypes.guess_type(file_path)[0], "file_size": os.path.getsize(file_path), "creation_date": datetime.fromtimestamp( Path(file_path).stat().st_ctime ).strftime("%Y-%m-%d"), "last_modified_date": datetime.fromtimestamp( Path(file_path).stat().st_mtime ).strftime("%Y-%m-%d"), "last_accessed_date": datetime.fromtimestamp( Path(file_path).stat().st_atime ).strftime("%Y-%m-%d"), }