File size: 4,115 Bytes
ed4d993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Airbyte vector stores."""

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    TypeVar,
)

import airbyte as ab
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import run_in_executor
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from langchain_text_splitters import TextSplitter

VST = TypeVar("VST", bound=VectorStore)


class AirbyteLoader:
    """Airbyte Document Loader.

    Example:
        .. code-block:: python

            from langchain_airbyte import AirbyteLoader

            loader = AirbyteLoader(
                source="github",
                stream="pull_requests",
            )
            documents = loader.lazy_load()
    """

    def __init__(
        self,
        source: str,
        stream: str,
        *,
        config: Optional[Dict] = None,
        include_metadata: bool = True,
        template: Optional[PromptTemplate] = None,
    ):
        self._airbyte_source = ab.get_source(source, config=config, streams=[stream])
        self._stream = stream
        self._template = template
        self._include_metadata = include_metadata

    def load(self) -> List[Document]:
        """Load source data into Document objects."""
        return list(self.lazy_load())

    def load_and_split(
        self, text_splitter: Optional[TextSplitter] = None
    ) -> List[Document]:
        """Load Documents and split into chunks. Chunks are returned as Documents.

        Args:
            text_splitter: TextSplitter instance to use for splitting documents.
              Defaults to RecursiveCharacterTextSplitter.

        Returns:
            List of Documents.
        """

        if text_splitter is None:
            try:
                from langchain_text_splitters import RecursiveCharacterTextSplitter
            except ImportError as e:
                raise ImportError(
                    "Unable to import from langchain_text_splitters. Please specify "
                    "text_splitter or install langchain_text_splitters with "
                    "`pip install -U langchain-text-splitters`."
                ) from e
            _text_splitter: TextSplitter = RecursiveCharacterTextSplitter()
        else:
            _text_splitter = text_splitter
        docs = self.lazy_load()
        return _text_splitter.split_documents(docs)

    def lazy_load(self) -> Iterator[Document]:
        """A lazy loader for Documents."""
        # if no prompt template defined, use default airbyte documents
        if not self._template:
            for document in self._airbyte_source.get_documents(self._stream):
                # convert airbyte document to langchain document
                metadata = (
                    {}
                    if not self._include_metadata
                    else {
                        **document.metadata,
                        "_last_modified": document.last_modified,
                        "_id": document.id,
                    }
                )
                yield Document(
                    page_content=document.content,
                    metadata=metadata,
                )
        else:
            records: Iterator[Mapping[str, Any]] = self._airbyte_source.get_records(
                self._stream
            )
            for record in records:
                metadata = {} if not self._include_metadata else dict(record)
                yield Document(
                    page_content=self._template.format(**record), metadata=metadata
                )

    async def alazy_load(self) -> AsyncIterator[Document]:
        """A lazy loader for Documents."""
        iterator = await run_in_executor(None, self.lazy_load)
        done = object()
        while True:
            doc = await run_in_executor(None, next, iterator, done)  # type: ignore[call-arg, arg-type]
            if doc is done:
                break
            yield doc  # type: ignore[misc]