File size: 4,194 Bytes
6431a8f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
from typing import List, Union, Dict
from urllib.parse import urljoin
import requests


class AppSearchClient:
    def __init__(self):
        self.appsearch_endpoint = "https://fgm-v2.ent.eastus2.azure.elastic-cloud.com"
        self.appsearch_private_key = "private-dzf1pbcssw97hxkm3wxbdrpu"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.appsearch_private_key}",
        }
        assert self.appsearch_endpoint is not None
        assert self.appsearch_private_key is not None

    def list_all_engines(self) -> List[str]:
        ENGINES_URL = "/api/as/v1/engines/"
        request_url = urljoin(self.appsearch_endpoint, ENGINES_URL)
        MAX_DOCS_PER_PAGE = 10
        current_page = 1
        while True:
            params = (
                ("page[size]", f"{MAX_DOCS_PER_PAGE}"),
                ("page[current]", f"{current_page}"),
            )
            r = requests.get(request_url, headers=self.headers, params=params).json()
            for item in r["results"]:
                yield item["name"]
            current_page += 1
            if not len(r["results"]):
                break

    def create_engine(self, name) -> requests.Response:
        ENGINES_URL = "/api/as/v1/engines/"
        request_url = urljoin(self.appsearch_endpoint, ENGINES_URL)
        data = json.dumps({"name": name}, indent=4, sort_keys=True)
        r = requests.post(request_url, headers=self.headers, data=data)
        return r

    def index_documents(self, data: Union[Dict, List[Dict]], engine_name: str) -> None:
        INDEX_URL = f"/api/as/v1/engines/{engine_name}/documents"
        request_url = urljoin(self.appsearch_endpoint, INDEX_URL)
        r = requests.post(
            request_url,
            headers=self.headers,
            data=json.dumps(data, indent=4, sort_keys=True),
        )

    def list_existing_docs(self, engine_name) -> List[Dict]:
        LIST_URL = f"/api/as/v1/engines/{engine_name}/documents/list"
        MAX_DOCS_PER_PAGE = 100
        request_url = urljoin(self.appsearch_endpoint, LIST_URL)
        current_page = 1
        docs = list()
        while True:
            params = (
                ("page[size]", f"{MAX_DOCS_PER_PAGE}"),
                ("page[current]", f"{current_page}"),
            )
            page_content = json.loads(
                requests.get(request_url, headers=self.headers, params=params).text
            )["results"]
            docs.extend(page_content)
            current_page += 1
            if not page_content:
                break
        return docs

    def list_existing_manual_urls(self, engine_name: str) -> List[Dict]:
        for doc in self.list_existing_docs(engine_name):
            if doc["is_manual"] == "true":
                yield doc["id"]

    def list_existing_non_manual_urls(self, engine_name: str) -> List[Dict]:
        for doc in self.list_existing_docs(engine_name):
            if doc["is_manual"] == "false":
                yield doc["id"]

    def list_existing_urls(self, engine_name: str) -> List[str]:
        for doc in self.list_existing_docs(engine_name):
            yield doc["id"]

    def get_elastic_query(self, data: str, size: int):
        return requests.post(
            url=f"{self.appsearch_endpoint}/api/as/v0/engines/us-speeches-s/elasticsearch/_search?size={size}",
            headers=self.headers, data=data)

    def delete_existing_non_manual_docs(self, engine_name: str) -> None:
        non_manual_doc_ids = list(self.list_existing_non_manual_urls(engine_name))
        DELETE_URL = f"/api/as/v1/engines/{engine_name}/documents"
        MAX_DOCS_TO_DELETE_PER_REQUEST = 100
        request_url = urljoin(self.appsearch_endpoint, DELETE_URL)

        def chunker(seq, size):
            return (seq[pos: pos + size] for pos in range(0, len(seq), size))

        for idx, group in enumerate(
                chunker(non_manual_doc_ids, MAX_DOCS_TO_DELETE_PER_REQUEST)
        ):
            r = requests.delete(
                request_url,
                headers=self.headers,
                data=json.dumps(group, indent=4, sort_keys=True),
            )