|
import json |
|
from typing import List, Union, Dict |
|
from urllib.parse import urljoin |
|
import requests |
|
|
|
|
|
class AppSearchClient: |
|
def __init__(self): |
|
self.appsearch_endpoint = "https://fgm-v2.ent.eastus2.azure.elastic-cloud.com" |
|
self.appsearch_private_key = "private-dzf1pbcssw97hxkm3wxbdrpu" |
|
self.headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {self.appsearch_private_key}", |
|
} |
|
assert self.appsearch_endpoint is not None |
|
assert self.appsearch_private_key is not None |
|
|
|
def list_all_engines(self) -> List[str]: |
|
ENGINES_URL = "/api/as/v1/engines/" |
|
request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) |
|
MAX_DOCS_PER_PAGE = 10 |
|
current_page = 1 |
|
while True: |
|
params = ( |
|
("page[size]", f"{MAX_DOCS_PER_PAGE}"), |
|
("page[current]", f"{current_page}"), |
|
) |
|
r = requests.get(request_url, headers=self.headers, params=params).json() |
|
for item in r["results"]: |
|
yield item["name"] |
|
current_page += 1 |
|
if not len(r["results"]): |
|
break |
|
|
|
def create_engine(self, name) -> requests.Response: |
|
ENGINES_URL = "/api/as/v1/engines/" |
|
request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) |
|
data = json.dumps({"name": name}, indent=4, sort_keys=True) |
|
r = requests.post(request_url, headers=self.headers, data=data) |
|
return r |
|
|
|
def index_documents(self, data: Union[Dict, List[Dict]], engine_name: str) -> None: |
|
INDEX_URL = f"/api/as/v1/engines/{engine_name}/documents" |
|
request_url = urljoin(self.appsearch_endpoint, INDEX_URL) |
|
r = requests.post( |
|
request_url, |
|
headers=self.headers, |
|
data=json.dumps(data, indent=4, sort_keys=True), |
|
) |
|
|
|
def list_existing_docs(self, engine_name) -> List[Dict]: |
|
LIST_URL = f"/api/as/v1/engines/{engine_name}/documents/list" |
|
MAX_DOCS_PER_PAGE = 100 |
|
request_url = urljoin(self.appsearch_endpoint, LIST_URL) |
|
current_page = 1 |
|
docs = list() |
|
while True: |
|
params = ( |
|
("page[size]", f"{MAX_DOCS_PER_PAGE}"), |
|
("page[current]", f"{current_page}"), |
|
) |
|
page_content = json.loads( |
|
requests.get(request_url, headers=self.headers, params=params).text |
|
)["results"] |
|
docs.extend(page_content) |
|
current_page += 1 |
|
if not page_content: |
|
break |
|
return docs |
|
|
|
def list_existing_manual_urls(self, engine_name: str) -> List[Dict]: |
|
for doc in self.list_existing_docs(engine_name): |
|
if doc["is_manual"] == "true": |
|
yield doc["id"] |
|
|
|
def list_existing_non_manual_urls(self, engine_name: str) -> List[Dict]: |
|
for doc in self.list_existing_docs(engine_name): |
|
if doc["is_manual"] == "false": |
|
yield doc["id"] |
|
|
|
def list_existing_urls(self, engine_name: str) -> List[str]: |
|
for doc in self.list_existing_docs(engine_name): |
|
yield doc["id"] |
|
|
|
def get_elastic_query(self, data: str, size: int): |
|
return requests.post( |
|
url=f"{self.appsearch_endpoint}/api/as/v0/engines/us-speeches-s/elasticsearch/_search?size={size}", |
|
headers=self.headers, data=data) |
|
|
|
def delete_existing_non_manual_docs(self, engine_name: str) -> None: |
|
non_manual_doc_ids = list(self.list_existing_non_manual_urls(engine_name)) |
|
DELETE_URL = f"/api/as/v1/engines/{engine_name}/documents" |
|
MAX_DOCS_TO_DELETE_PER_REQUEST = 100 |
|
request_url = urljoin(self.appsearch_endpoint, DELETE_URL) |
|
|
|
def chunker(seq, size): |
|
return (seq[pos: pos + size] for pos in range(0, len(seq), size)) |
|
|
|
for idx, group in enumerate( |
|
chunker(non_manual_doc_ids, MAX_DOCS_TO_DELETE_PER_REQUEST) |
|
): |
|
r = requests.delete( |
|
request_url, |
|
headers=self.headers, |
|
data=json.dumps(group, indent=4, sort_keys=True), |
|
) |
|
|