import json from typing import List, Union, Dict from urllib.parse import urljoin import requests class AppSearchClient: def __init__(self): self.appsearch_endpoint = "https://fgm-v2.ent.eastus2.azure.elastic-cloud.com" self.appsearch_private_key = "private-dzf1pbcssw97hxkm3wxbdrpu" self.headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.appsearch_private_key}", } assert self.appsearch_endpoint is not None assert self.appsearch_private_key is not None def list_all_engines(self) -> List[str]: ENGINES_URL = "/api/as/v1/engines/" request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) MAX_DOCS_PER_PAGE = 10 current_page = 1 while True: params = ( ("page[size]", f"{MAX_DOCS_PER_PAGE}"), ("page[current]", f"{current_page}"), ) r = requests.get(request_url, headers=self.headers, params=params).json() for item in r["results"]: yield item["name"] current_page += 1 if not len(r["results"]): break def create_engine(self, name) -> requests.Response: ENGINES_URL = "/api/as/v1/engines/" request_url = urljoin(self.appsearch_endpoint, ENGINES_URL) data = json.dumps({"name": name}, indent=4, sort_keys=True) r = requests.post(request_url, headers=self.headers, data=data) return r def index_documents(self, data: Union[Dict, List[Dict]], engine_name: str) -> None: INDEX_URL = f"/api/as/v1/engines/{engine_name}/documents" request_url = urljoin(self.appsearch_endpoint, INDEX_URL) r = requests.post( request_url, headers=self.headers, data=json.dumps(data, indent=4, sort_keys=True), ) def list_existing_docs(self, engine_name) -> List[Dict]: LIST_URL = f"/api/as/v1/engines/{engine_name}/documents/list" MAX_DOCS_PER_PAGE = 100 request_url = urljoin(self.appsearch_endpoint, LIST_URL) current_page = 1 docs = list() while True: params = ( ("page[size]", f"{MAX_DOCS_PER_PAGE}"), ("page[current]", f"{current_page}"), ) page_content = json.loads( requests.get(request_url, headers=self.headers, params=params).text )["results"] docs.extend(page_content) current_page += 1 if not page_content: break return docs def list_existing_manual_urls(self, engine_name: str) -> List[Dict]: for doc in self.list_existing_docs(engine_name): if doc["is_manual"] == "true": yield doc["id"] def list_existing_non_manual_urls(self, engine_name: str) -> List[Dict]: for doc in self.list_existing_docs(engine_name): if doc["is_manual"] == "false": yield doc["id"] def list_existing_urls(self, engine_name: str) -> List[str]: for doc in self.list_existing_docs(engine_name): yield doc["id"] def get_elastic_query(self, data: str, size: int): return requests.post( url=f"{self.appsearch_endpoint}/api/as/v0/engines/us-speeches-s/elasticsearch/_search?size={size}", headers=self.headers, data=data) def delete_existing_non_manual_docs(self, engine_name: str) -> None: non_manual_doc_ids = list(self.list_existing_non_manual_urls(engine_name)) DELETE_URL = f"/api/as/v1/engines/{engine_name}/documents" MAX_DOCS_TO_DELETE_PER_REQUEST = 100 request_url = urljoin(self.appsearch_endpoint, DELETE_URL) def chunker(seq, size): return (seq[pos: pos + size] for pos in range(0, len(seq), size)) for idx, group in enumerate( chunker(non_manual_doc_ids, MAX_DOCS_TO_DELETE_PER_REQUEST) ): r = requests.delete( request_url, headers=self.headers, data=json.dumps(group, indent=4, sort_keys=True), )