File size: 3,028 Bytes
129cd69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from __future__ import annotations

from typing import List, Optional

import aiohttp
import requests
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever

from langchain.callbacks.manager import (
    AsyncCallbackManagerForRetrieverRun,
    CallbackManagerForRetrieverRun,
)


class ChatGPTPluginRetriever(BaseRetriever):
    """`ChatGPT plugin` retriever."""

    url: str
    """URL of the ChatGPT plugin."""
    bearer_token: str
    """Bearer token for the ChatGPT plugin."""
    top_k: int = 3
    """Number of documents to return."""
    filter: Optional[dict] = None
    """Filter to apply to the results."""
    aiosession: Optional[aiohttp.ClientSession] = None
    """Aiohttp session to use for requests."""

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True
        """Allow arbitrary types."""

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        url, json, headers = self._create_request(query)
        response = requests.post(url, json=json, headers=headers)
        results = response.json()["results"][0]["results"]
        docs = []
        for d in results:
            content = d.pop("text")
            metadata = d.pop("metadata", d)
            if metadata.get("source_id"):
                metadata["source"] = metadata.pop("source_id")
            docs.append(Document(page_content=content, metadata=metadata))
        return docs

    async def _aget_relevant_documents(
        self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
    ) -> List[Document]:
        url, json, headers = self._create_request(query)

        if not self.aiosession:
            async with aiohttp.ClientSession() as session:
                async with session.post(url, headers=headers, json=json) as response:
                    res = await response.json()
        else:
            async with self.aiosession.post(
                url, headers=headers, json=json
            ) as response:
                res = await response.json()

        results = res["results"][0]["results"]
        docs = []
        for d in results:
            content = d.pop("text")
            metadata = d.pop("metadata", d)
            if metadata.get("source_id"):
                metadata["source"] = metadata.pop("source_id")
            docs.append(Document(page_content=content, metadata=metadata))
        return docs

    def _create_request(self, query: str) -> tuple[str, dict, dict]:
        url = f"{self.url}/query"
        json = {
            "queries": [
                {
                    "query": query,
                    "filter": self.filter,
                    "top_k": self.top_k,
                }
            ]
        }
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.bearer_token}",
        }
        return url, json, headers