File size: 8,911 Bytes
129cd69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import base64
import warnings
from typing import Any, Dict, Iterator, List, Optional

import requests
from langchain_core.pydantic_v1 import BaseModel, root_validator, validator
from typing_extensions import NotRequired, TypedDict

from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseBlobParser, BaseLoader
from langchain.document_loaders.blob_loaders import Blob
from langchain.text_splitter import TextSplitter
from langchain.utils import get_from_dict_or_env

EMBAAS_DOC_API_URL = "https://api.embaas.io/v1/document/extract-text/bytes/"


class EmbaasDocumentExtractionParameters(TypedDict):
    """Parameters for the embaas document extraction API."""

    mime_type: NotRequired[str]
    """The mime type of the document."""
    file_extension: NotRequired[str]
    """The file extension of the document."""
    file_name: NotRequired[str]
    """The file name of the document."""

    should_chunk: NotRequired[bool]
    """Whether to chunk the document into pages."""
    chunk_size: NotRequired[int]
    """The maximum size of the text chunks."""
    chunk_overlap: NotRequired[int]
    """The maximum overlap allowed between chunks."""
    chunk_splitter: NotRequired[str]
    """The text splitter class name for creating chunks."""
    separators: NotRequired[List[str]]
    """The separators for chunks."""

    should_embed: NotRequired[bool]
    """Whether to create embeddings for the document in the response."""
    model: NotRequired[str]
    """The model to pass to the Embaas document extraction API."""
    instruction: NotRequired[str]
    """The instruction to pass to the Embaas document extraction API."""


class EmbaasDocumentExtractionPayload(EmbaasDocumentExtractionParameters):
    """Payload for the Embaas document extraction API."""

    bytes: str
    """The base64 encoded bytes of the document to extract text from."""


class BaseEmbaasLoader(BaseModel):
    """Base loader for `Embaas` document extraction API."""

    embaas_api_key: Optional[str] = None
    """The API key for the Embaas document extraction API."""
    api_url: str = EMBAAS_DOC_API_URL
    """The URL of the Embaas document extraction API."""
    params: EmbaasDocumentExtractionParameters = EmbaasDocumentExtractionParameters()
    """Additional parameters to pass to the Embaas document extraction API."""

    @root_validator(pre=True)
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that api key and python package exists in environment."""
        embaas_api_key = get_from_dict_or_env(
            values, "embaas_api_key", "EMBAAS_API_KEY"
        )
        values["embaas_api_key"] = embaas_api_key
        return values


class EmbaasBlobLoader(BaseEmbaasLoader, BaseBlobParser):
    """Load `Embaas` blob.

    To use, you should have the
    environment variable ``EMBAAS_API_KEY`` set with your API key, or pass
    it as a named parameter to the constructor.

    Example:
        .. code-block:: python

            # Default parsing
            from langchain.document_loaders.embaas import EmbaasBlobLoader
            loader = EmbaasBlobLoader()
            blob = Blob.from_path(path="example.mp3")
            documents = loader.parse(blob=blob)

            # Custom api parameters (create embeddings automatically)
            from langchain.document_loaders.embaas import EmbaasBlobLoader
            loader = EmbaasBlobLoader(
                params={
                    "should_embed": True,
                    "model": "e5-large-v2",
                    "chunk_size": 256,
                    "chunk_splitter": "CharacterTextSplitter"
                }
            )
            blob = Blob.from_path(path="example.pdf")
            documents = loader.parse(blob=blob)
    """

    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """Parses the blob lazily.

        Args:
            blob: The blob to parse.
        """
        yield from self._get_documents(blob=blob)

    @staticmethod
    def _api_response_to_documents(chunks: List[Dict[str, Any]]) -> List[Document]:
        """Convert the API response to a list of documents."""
        docs = []
        for chunk in chunks:
            metadata = chunk["metadata"]
            if chunk.get("embedding", None) is not None:
                metadata["embedding"] = chunk["embedding"]
            doc = Document(page_content=chunk["text"], metadata=metadata)
            docs.append(doc)

        return docs

    def _generate_payload(self, blob: Blob) -> EmbaasDocumentExtractionPayload:
        """Generates payload for the API request."""
        base64_byte_str = base64.b64encode(blob.as_bytes()).decode()
        payload: EmbaasDocumentExtractionPayload = EmbaasDocumentExtractionPayload(
            bytes=base64_byte_str,
            # Workaround for mypy issue: https://github.com/python/mypy/issues/9408
            # type: ignore
            **self.params,
        )

        if blob.mimetype is not None and payload.get("mime_type", None) is None:
            payload["mime_type"] = blob.mimetype

        return payload

    def _handle_request(
        self, payload: EmbaasDocumentExtractionPayload
    ) -> List[Document]:
        """Sends a request to the embaas API and handles the response."""
        headers = {
            "Authorization": f"Bearer {self.embaas_api_key}",
            "Content-Type": "application/json",
        }

        response = requests.post(self.api_url, headers=headers, json=payload)
        response.raise_for_status()

        parsed_response = response.json()
        return EmbaasBlobLoader._api_response_to_documents(
            chunks=parsed_response["data"]["chunks"]
        )

    def _get_documents(self, blob: Blob) -> Iterator[Document]:
        """Get the documents from the blob."""
        payload = self._generate_payload(blob=blob)

        try:
            documents = self._handle_request(payload=payload)
        except requests.exceptions.RequestException as e:
            if e.response is None or not e.response.text:
                raise ValueError(
                    f"Error raised by Embaas document text extraction API: {e}"
                )

            parsed_response = e.response.json()
            if "message" in parsed_response:
                raise ValueError(
                    f"Validation Error raised by Embaas document text extraction API:"
                    f" {parsed_response['message']}"
                )
            raise

        yield from documents


class EmbaasLoader(BaseEmbaasLoader, BaseLoader):
    """Load from `Embaas`.

    To use, you should have the
    environment variable ``EMBAAS_API_KEY`` set with your API key, or pass
    it as a named parameter to the constructor.

    Example:
        .. code-block:: python

            # Default parsing
            from langchain.document_loaders.embaas import EmbaasLoader
            loader = EmbaasLoader(file_path="example.mp3")
            documents = loader.load()

            # Custom api parameters (create embeddings automatically)
            from langchain.document_loaders.embaas import EmbaasBlobLoader
            loader = EmbaasBlobLoader(
                file_path="example.pdf",
                params={
                    "should_embed": True,
                    "model": "e5-large-v2",
                    "chunk_size": 256,
                    "chunk_splitter": "CharacterTextSplitter"
                }
            )
            documents = loader.load()
    """

    file_path: str
    """The path to the file to load."""
    blob_loader: Optional[EmbaasBlobLoader]
    """The blob loader to use. If not provided, a default one will be created."""

    @validator("blob_loader", always=True)
    def validate_blob_loader(
        cls, v: EmbaasBlobLoader, values: Dict
    ) -> EmbaasBlobLoader:
        return v or EmbaasBlobLoader(
            embaas_api_key=values["embaas_api_key"],
            api_url=values["api_url"],
            params=values["params"],
        )

    def lazy_load(self) -> Iterator[Document]:
        """Load the documents from the file path lazily."""
        blob = Blob.from_path(path=self.file_path)

        assert self.blob_loader is not None
        # Should never be None, but mypy doesn't know that.
        yield from self.blob_loader.lazy_parse(blob=blob)

    def load(self) -> List[Document]:
        return list(self.lazy_load())

    def load_and_split(
        self, text_splitter: Optional[TextSplitter] = None
    ) -> List[Document]:
        if self.params.get("should_embed", False):
            warnings.warn(
                "Embeddings are not supported with load_and_split."
                " Use the API splitter to properly generate embeddings."
                " For more information see embaas.io docs."
            )
        return super().load_and_split(text_splitter=text_splitter)