File size: 3,655 Bytes
129cd69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations

from typing import TYPE_CHECKING, List, Optional

from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from langchain.utilities.vertexai import get_client_info

if TYPE_CHECKING:
    from google.auth.credentials import Credentials


class BigQueryLoader(BaseLoader):
    """Load from the Google Cloud Platform `BigQuery`.

    Each document represents one row of the result. The `page_content_columns`
    are written into the `page_content` of the document. The `metadata_columns`
    are written into the `metadata` of the document. By default, all columns
    are written into the `page_content` and none into the `metadata`.

    """

    def __init__(
        self,
        query: str,
        project: Optional[str] = None,
        page_content_columns: Optional[List[str]] = None,
        metadata_columns: Optional[List[str]] = None,
        credentials: Optional[Credentials] = None,
    ):
        """Initialize BigQuery document loader.

        Args:
            query: The query to run in BigQuery.
            project: Optional. The project to run the query in.
            page_content_columns: Optional. The columns to write into the `page_content`
                of the document.
            metadata_columns: Optional. The columns to write into the `metadata` of the
                document.
            credentials : google.auth.credentials.Credentials, optional
              Credentials for accessing Google APIs. Use this parameter to override
                default credentials, such as to use Compute Engine
                (`google.auth.compute_engine.Credentials`) or Service Account
                (`google.oauth2.service_account.Credentials`) credentials directly.
        """
        self.query = query
        self.project = project
        self.page_content_columns = page_content_columns
        self.metadata_columns = metadata_columns
        self.credentials = credentials

    def load(self) -> List[Document]:
        try:
            from google.cloud import bigquery
        except ImportError as ex:
            raise ImportError(
                "Could not import google-cloud-bigquery python package. "
                "Please install it with `pip install google-cloud-bigquery`."
            ) from ex

        bq_client = bigquery.Client(
            credentials=self.credentials,
            project=self.project,
            client_info=get_client_info(module="bigquery"),
        )
        if not bq_client.project:
            error_desc = (
                "GCP project for Big Query is not set! Either provide a "
                "`project` argument during BigQueryLoader instantiation, "
                "or set a default project with `gcloud config set project` "
                "command."
            )
            raise ValueError(error_desc)
        query_result = bq_client.query(self.query).result()
        docs: List[Document] = []

        page_content_columns = self.page_content_columns
        metadata_columns = self.metadata_columns

        if page_content_columns is None:
            page_content_columns = [column.name for column in query_result.schema]
        if metadata_columns is None:
            metadata_columns = []

        for row in query_result:
            page_content = "\n".join(
                f"{k}: {v}" for k, v in row.items() if k in page_content_columns
            )
            metadata = {k: v for k, v in row.items() if k in metadata_columns}
            doc = Document(page_content=page_content, metadata=metadata)
            docs.append(doc)

        return docs