File size: 3,959 Bytes
dfc6dc5
 
 
 
 
 
 
 
 
 
7a9ec21
dfc6dc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9021b39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a9ec21
9021b39
7a9ec21
 
9021b39
 
 
 
 
 
 
 
 
7a9ec21
 
 
 
 
 
 
 
 
 
 
9021b39
 
 
dfc6dc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Unstructured file reader.

A parser for unstructured text files using Unstructured.io.
Supports .txt, .docx, .pptx, .jpg, .png, .eml, .html, and .pdf documents.

"""
from datetime import datetime
import mimetypes
import os
from pathlib import Path
import re
from typing import Any, Dict, List, Optional

from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document


class UnstructuredReader(BaseReader):
    """General unstructured text reader for a variety of files."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)

        # Prerequisite for Unstructured.io to work
        import nltk

        nltk.download("punkt")
        nltk.download("averaged_perceptron_tagger")

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        split_documents: Optional[bool] = True,
    ) -> List[Document]:
        """Parse file."""
        from unstructured.partition.auto import partition

        elements = partition(str(file))
        text_chunks = [" ".join(str(el).split()) for el in elements]

        if split_documents:
            return [
                Document(text=chunk, extra_info=extra_info or {})
                for chunk in text_chunks
            ]
        else:
            return [
                Document(text="\n\n".join(text_chunks), extra_info=extra_info or {})
            ]


class MarkdownReader(BaseReader):
    """General unstructured text reader for a variety of files."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        split_documents: Optional[bool] = True,
    ) -> List[Document]:
        """Parse file."""
        from unstructured.partition.auto import partition

        elements = parse_knowledge_units(str(file))

        if split_documents:
            return [
                Document(text=ele, extra_info=extra_info or {})
                for ele in elements 
            ]

def parse_knowledge_units(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    knowledge_units = []
    current_unit = ""
    unit_start_pattern = re.compile(r'^\d+\.\s')
    for line in lines:
        stripped_line = line.strip()
        if unit_start_pattern.match(stripped_line):
            if current_unit:
                knowledge_units.append(current_unit.strip())
                current_unit = ""
            current_unit += line
        else:
            current_unit += line

    if current_unit:
        knowledge_units.append(current_unit.strip())
    # for line in lines:
    #     if line.strip() and line[0].isdigit() and '.' in line:
    #         if current_unit:
    #             knowledge_units.append(current_unit.strip())
    #             current_unit = ""
    #         current_unit += line
    #     else:
    #         current_unit += line

    # if current_unit:
    #     knowledge_units.append(current_unit.strip())

    return knowledge_units

def default_file_metadata_func(file_path: str) -> Dict:
    """Get some handy metadate from filesystem.

    Args:
        file_path: str: file path in str
    """
    return {
        "file_path": file_path,
        "file_name": os.path.basename(file_path),
        "file_type": mimetypes.guess_type(file_path)[0],
        "file_size": os.path.getsize(file_path),
        "creation_date": datetime.fromtimestamp(
            Path(file_path).stat().st_ctime
        ).strftime("%Y-%m-%d"),
        "last_modified_date": datetime.fromtimestamp(
            Path(file_path).stat().st_mtime
        ).strftime("%Y-%m-%d"),
        "last_accessed_date": datetime.fromtimestamp(
            Path(file_path).stat().st_atime
        ).strftime("%Y-%m-%d"),
    }