Spaces:
Runtime error
Runtime error
File size: 5,904 Bytes
129cd69 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import concurrent
import logging
import random
from pathlib import Path
from typing import Any, List, Optional, Type, Union
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.document_loaders.html_bs import BSHTMLLoader
from langchain.document_loaders.text import TextLoader
from langchain.document_loaders.unstructured import UnstructuredFileLoader
FILE_LOADER_TYPE = Union[
Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader]
]
logger = logging.getLogger(__name__)
def _is_visible(p: Path) -> bool:
parts = p.parts
for _p in parts:
if _p.startswith("."):
return False
return True
class DirectoryLoader(BaseLoader):
"""Load from a directory."""
def __init__(
self,
path: str,
glob: str = "**/[!.]*",
silent_errors: bool = False,
load_hidden: bool = False,
loader_cls: FILE_LOADER_TYPE = UnstructuredFileLoader,
loader_kwargs: Union[dict, None] = None,
recursive: bool = False,
show_progress: bool = False,
use_multithreading: bool = False,
max_concurrency: int = 4,
*,
sample_size: int = 0,
randomize_sample: bool = False,
sample_seed: Union[int, None] = None,
):
"""Initialize with a path to directory and how to glob over it.
Args:
path: Path to directory.
glob: Glob pattern to use to find files. Defaults to "**/[!.]*"
(all files except hidden).
silent_errors: Whether to silently ignore errors. Defaults to False.
load_hidden: Whether to load hidden files. Defaults to False.
loader_cls: Loader class to use for loading files.
Defaults to UnstructuredFileLoader.
loader_kwargs: Keyword arguments to pass to loader_cls. Defaults to None.
recursive: Whether to recursively search for files. Defaults to False.
show_progress: Whether to show a progress bar. Defaults to False.
use_multithreading: Whether to use multithreading. Defaults to False.
max_concurrency: The maximum number of threads to use. Defaults to 4.
sample_size: The maximum number of files you would like to load from the
directory.
randomize_sample: Suffle the files to get a random sample.
sample_seed: set the seed of the random shuffle for reporoducibility.
"""
if loader_kwargs is None:
loader_kwargs = {}
self.path = path
self.glob = glob
self.load_hidden = load_hidden
self.loader_cls = loader_cls
self.loader_kwargs = loader_kwargs
self.silent_errors = silent_errors
self.recursive = recursive
self.show_progress = show_progress
self.use_multithreading = use_multithreading
self.max_concurrency = max_concurrency
self.sample_size = sample_size
self.randomize_sample = randomize_sample
self.sample_seed = sample_seed
def load_file(
self, item: Path, path: Path, docs: List[Document], pbar: Optional[Any]
) -> None:
"""Load a file.
Args:
item: File path.
path: Directory path.
docs: List of documents to append to.
pbar: Progress bar. Defaults to None.
"""
if item.is_file():
if _is_visible(item.relative_to(path)) or self.load_hidden:
try:
logger.debug(f"Processing file: {str(item)}")
sub_docs = self.loader_cls(str(item), **self.loader_kwargs).load()
docs.extend(sub_docs)
except Exception as e:
if self.silent_errors:
logger.warning(f"Error loading file {str(item)}: {e}")
else:
raise e
finally:
if pbar:
pbar.update(1)
def load(self) -> List[Document]:
"""Load documents."""
p = Path(self.path)
if not p.exists():
raise FileNotFoundError(f"Directory not found: '{self.path}'")
if not p.is_dir():
raise ValueError(f"Expected directory, got file: '{self.path}'")
docs: List[Document] = []
items = list(p.rglob(self.glob) if self.recursive else p.glob(self.glob))
if self.sample_size > 0:
if self.randomize_sample:
randomizer = (
random.Random(self.sample_seed) if self.sample_seed else random
)
randomizer.shuffle(items) # type: ignore
items = items[: min(len(items), self.sample_size)]
pbar = None
if self.show_progress:
try:
from tqdm import tqdm
pbar = tqdm(total=len(items))
except ImportError as e:
logger.warning(
"To log the progress of DirectoryLoader you need to install tqdm, "
"`pip install tqdm`"
)
if self.silent_errors:
logger.warning(e)
else:
raise ImportError(
"To log the progress of DirectoryLoader "
"you need to install tqdm, "
"`pip install tqdm`"
)
if self.use_multithreading:
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_concurrency
) as executor:
executor.map(lambda i: self.load_file(i, p, docs, pbar), items)
else:
for i in items:
self.load_file(i, p, docs, pbar)
if pbar:
pbar.close()
return docs
|