fgmonitor-semantic / appsearch.py
cta2106
first commit
6431a8f
import json
from typing import List, Union, Dict
from urllib.parse import urljoin
import requests
class AppSearchClient:
def __init__(self):
self.appsearch_endpoint = "https://fgm-v2.ent.eastus2.azure.elastic-cloud.com"
self.appsearch_private_key = "private-dzf1pbcssw97hxkm3wxbdrpu"
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.appsearch_private_key}",
}
assert self.appsearch_endpoint is not None
assert self.appsearch_private_key is not None
def list_all_engines(self) -> List[str]:
ENGINES_URL = "/api/as/v1/engines/"
request_url = urljoin(self.appsearch_endpoint, ENGINES_URL)
MAX_DOCS_PER_PAGE = 10
current_page = 1
while True:
params = (
("page[size]", f"{MAX_DOCS_PER_PAGE}"),
("page[current]", f"{current_page}"),
)
r = requests.get(request_url, headers=self.headers, params=params).json()
for item in r["results"]:
yield item["name"]
current_page += 1
if not len(r["results"]):
break
def create_engine(self, name) -> requests.Response:
ENGINES_URL = "/api/as/v1/engines/"
request_url = urljoin(self.appsearch_endpoint, ENGINES_URL)
data = json.dumps({"name": name}, indent=4, sort_keys=True)
r = requests.post(request_url, headers=self.headers, data=data)
return r
def index_documents(self, data: Union[Dict, List[Dict]], engine_name: str) -> None:
INDEX_URL = f"/api/as/v1/engines/{engine_name}/documents"
request_url = urljoin(self.appsearch_endpoint, INDEX_URL)
r = requests.post(
request_url,
headers=self.headers,
data=json.dumps(data, indent=4, sort_keys=True),
)
def list_existing_docs(self, engine_name) -> List[Dict]:
LIST_URL = f"/api/as/v1/engines/{engine_name}/documents/list"
MAX_DOCS_PER_PAGE = 100
request_url = urljoin(self.appsearch_endpoint, LIST_URL)
current_page = 1
docs = list()
while True:
params = (
("page[size]", f"{MAX_DOCS_PER_PAGE}"),
("page[current]", f"{current_page}"),
)
page_content = json.loads(
requests.get(request_url, headers=self.headers, params=params).text
)["results"]
docs.extend(page_content)
current_page += 1
if not page_content:
break
return docs
def list_existing_manual_urls(self, engine_name: str) -> List[Dict]:
for doc in self.list_existing_docs(engine_name):
if doc["is_manual"] == "true":
yield doc["id"]
def list_existing_non_manual_urls(self, engine_name: str) -> List[Dict]:
for doc in self.list_existing_docs(engine_name):
if doc["is_manual"] == "false":
yield doc["id"]
def list_existing_urls(self, engine_name: str) -> List[str]:
for doc in self.list_existing_docs(engine_name):
yield doc["id"]
def get_elastic_query(self, data: str, size: int):
return requests.post(
url=f"{self.appsearch_endpoint}/api/as/v0/engines/us-speeches-s/elasticsearch/_search?size={size}",
headers=self.headers, data=data)
def delete_existing_non_manual_docs(self, engine_name: str) -> None:
non_manual_doc_ids = list(self.list_existing_non_manual_urls(engine_name))
DELETE_URL = f"/api/as/v1/engines/{engine_name}/documents"
MAX_DOCS_TO_DELETE_PER_REQUEST = 100
request_url = urljoin(self.appsearch_endpoint, DELETE_URL)
def chunker(seq, size):
return (seq[pos: pos + size] for pos in range(0, len(seq), size))
for idx, group in enumerate(
chunker(non_manual_doc_ids, MAX_DOCS_TO_DELETE_PER_REQUEST)
):
r = requests.delete(
request_url,
headers=self.headers,
data=json.dumps(group, indent=4, sort_keys=True),
)