AWEsumCare-Demo / custom_io.py
ray
v2
7a9ec21
raw history blame
No virus
3.96 kB
"""Unstructured file reader.
A parser for unstructured text files using Unstructured.io.
Supports .txt, .docx, .pptx, .jpg, .png, .eml, .html, and .pdf documents.
"""
from datetime import datetime
import mimetypes
import os
from pathlib import Path
import re
from typing import Any, Dict, List, Optional
from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document
class UnstructuredReader(BaseReader):
"""General unstructured text reader for a variety of files."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Init params."""
super().__init__(*args, **kwargs)
# Prerequisite for Unstructured.io to work
import nltk
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
def load_data(
self,
file: Path,
extra_info: Optional[Dict] = None,
split_documents: Optional[bool] = True,
) -> List[Document]:
"""Parse file."""
from unstructured.partition.auto import partition
elements = partition(str(file))
text_chunks = [" ".join(str(el).split()) for el in elements]
if split_documents:
return [
Document(text=chunk, extra_info=extra_info or {})
for chunk in text_chunks
]
else:
return [
Document(text="\n\n".join(text_chunks), extra_info=extra_info or {})
]
class MarkdownReader(BaseReader):
"""General unstructured text reader for a variety of files."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Init params."""
super().__init__(*args, **kwargs)
def load_data(
self,
file: Path,
extra_info: Optional[Dict] = None,
split_documents: Optional[bool] = True,
) -> List[Document]:
"""Parse file."""
from unstructured.partition.auto import partition
elements = parse_knowledge_units(str(file))
if split_documents:
return [
Document(text=ele, extra_info=extra_info or {})
for ele in elements
]
def parse_knowledge_units(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
knowledge_units = []
current_unit = ""
unit_start_pattern = re.compile(r'^\d+\.\s')
for line in lines:
stripped_line = line.strip()
if unit_start_pattern.match(stripped_line):
if current_unit:
knowledge_units.append(current_unit.strip())
current_unit = ""
current_unit += line
else:
current_unit += line
if current_unit:
knowledge_units.append(current_unit.strip())
# for line in lines:
# if line.strip() and line[0].isdigit() and '.' in line:
# if current_unit:
# knowledge_units.append(current_unit.strip())
# current_unit = ""
# current_unit += line
# else:
# current_unit += line
# if current_unit:
# knowledge_units.append(current_unit.strip())
return knowledge_units
def default_file_metadata_func(file_path: str) -> Dict:
"""Get some handy metadate from filesystem.
Args:
file_path: str: file path in str
"""
return {
"file_path": file_path,
"file_name": os.path.basename(file_path),
"file_type": mimetypes.guess_type(file_path)[0],
"file_size": os.path.getsize(file_path),
"creation_date": datetime.fromtimestamp(
Path(file_path).stat().st_ctime
).strftime("%Y-%m-%d"),
"last_modified_date": datetime.fromtimestamp(
Path(file_path).stat().st_mtime
).strftime("%Y-%m-%d"),
"last_accessed_date": datetime.fromtimestamp(
Path(file_path).stat().st_atime
).strftime("%Y-%m-%d"),
}