import threading |
import chromadb |
import posthog |
import torch |
import math |
import numpy as np |
import extensions.superboogav2.parameters as parameters |
from chromadb.config import Settings |
from sentence_transformers import SentenceTransformer |
from modules.logging_colors import logger |
from modules.text_generation import encode, decode |
logger.debug('Intercepting all calls to posthog.') |
posthog.capture = lambda *args, **kwargs: None |
class Collecter(): |
def __init__(self): |
pass |
def add(self, texts: list[str], texts_with_context: list[str], starting_indices: list[int]): |
pass |
def get(self, search_strings: list[str], n_results: int) -> list[str]: |
pass |
def clear(self): |
pass |
class Embedder(): |
def __init__(self): |
pass |
def embed(self, text: str) -> list[torch.Tensor]: |
pass |
class Info: |
def __init__(self, start_index, text_with_context, distance, id): |
self.text_with_context = text_with_context |
self.start_index = start_index |
self.distance = distance |
self.id = id |
def calculate_distance(self, other_info): |
if parameters.get_new_dist_strategy() == parameters.DIST_MIN_STRATEGY: |
return min(self.distance, other_info.distance) |
elif parameters.get_new_dist_strategy() == parameters.DIST_HARMONIC_STRATEGY: |
return 2 * (self.distance * other_info.distance) / (self.distance + other_info.distance) |
elif parameters.get_new_dist_strategy() == parameters.DIST_GEOMETRIC_STRATEGY: |
return (self.distance * other_info.distance) ** 0.5 |
elif parameters.get_new_dist_strategy() == parameters.DIST_ARITHMETIC_STRATEGY: |
return (self.distance + other_info.distance) / 2 |
else: |
return min(self.distance, other_info.distance) |
def merge_with(self, other_info): |
s1 = self.text_with_context |
s2 = other_info.text_with_context |
s1_start = self.start_index |
s2_start = other_info.start_index |
new_dist = self.calculate_distance(other_info) |
if self.should_merge(s1, s2, s1_start, s2_start): |
if s1_start <= s2_start: |
if s1_start + len(s1) >= s2_start + len(s2): |
return Info(s1_start, s1, new_dist, self.id) |
else: |
overlap = max(0, s1_start + len(s1) - s2_start) |
return Info(s1_start, s1 + s2[overlap:], new_dist, self.id) |
else: |
if s2_start + len(s2) >= s1_start + len(s1): |
return Info(s2_start, s2, new_dist, other_info.id) |
else: |
overlap = max(0, s2_start + len(s2) - s1_start) |
return Info(s2_start, s2 + s1[overlap:], new_dist, other_info.id) |
return None |
@staticmethod |
def should_merge(s1, s2, s1_start, s2_start): |
s1_end = s1_start + len(s1) |
s2_end = s2_start + len(s2) |
return not (s1_end < s2_start or s2_end < s1_start) |
class ChromaCollector(Collecter): |
def __init__(self, embedder: Embedder): |
super().__init__() |
self.chroma_client = chromadb.Client(Settings(anonymized_telemetry=False)) |
self.embedder = embedder |
self.collection = self.chroma_client.create_collection(name="context", embedding_function=self.embedder.embed) |
self.ids = [] |
self.id_to_info = {} |
self.embeddings_cache = {} |
self.lock = threading.Lock() |
def add(self, texts: list[str], texts_with_context: list[str], starting_indices: list[int], metadatas: list[dict] = None): |
with self.lock: |
assert metadatas is None or len(metadatas) == len(texts), "metadatas must be None or have the same length as texts" |
if len(texts) == 0: |
return |
new_ids = self._get_new_ids(len(texts)) |
(existing_texts, existing_embeddings, existing_ids, existing_metas), \ |
(non_existing_texts, non_existing_ids, non_existing_metas) = self._split_texts_by_cache_hit(texts, new_ids, metadatas) |
if existing_texts: |
logger.info(f'Adding {len(existing_embeddings)} cached embeddings.') |
args = {'embeddings': existing_embeddings, 'documents': existing_texts, 'ids': existing_ids} |
if metadatas is not None: |
args['metadatas'] = existing_metas |
self.collection.add(**args) |
if non_existing_texts: |
non_existing_embeddings = self.embedder.embed(non_existing_texts).tolist() |
for text, embedding in zip(non_existing_texts, non_existing_embeddings): |
self.embeddings_cache[text] = embedding |
logger.info(f'Adding {len(non_existing_embeddings)} new embeddings.') |
args = {'embeddings': non_existing_embeddings, 'documents': non_existing_texts, 'ids': non_existing_ids} |
if metadatas is not None: |
args['metadatas'] = non_existing_metas |
self.collection.add(**args) |
new_info = { |
id_: {'text_with_context': context, 'start_index': start_index} |
for id_, context, start_index in zip(new_ids, texts_with_context, starting_indices) |
} |
self.id_to_info.update(new_info) |
self.ids.extend(new_ids) |
def _split_texts_by_cache_hit(self, texts: list[str], new_ids: list[str], metadatas: list[dict]): |
existing_texts, non_existing_texts = [], [] |
existing_embeddings = [] |
existing_ids, non_existing_ids = [], [] |
existing_metas, non_existing_metas = [], [] |
for i, text in enumerate(texts): |
id_ = new_ids[i] |
metadata = metadatas[i] if metadatas is not None else None |
embedding = self.embeddings_cache.get(text) |
if embedding: |
existing_texts.append(text) |
existing_embeddings.append(embedding) |
existing_ids.append(id_) |
existing_metas.append(metadata) |
else: |
non_existing_texts.append(text) |
non_existing_ids.append(id_) |
non_existing_metas.append(metadata) |
return (existing_texts, existing_embeddings, existing_ids, existing_metas), \ |
(non_existing_texts, non_existing_ids, non_existing_metas) |
def _get_new_ids(self, num_new_ids: int): |
if self.ids: |
max_existing_id = max(int(id_) for id_ in self.ids) |
else: |
max_existing_id = -1 |
return [str(i + max_existing_id + 1) for i in range(num_new_ids)] |
def _find_min_max_start_index(self): |
max_index, min_index = 0, float('inf') |
for _, val in self.id_to_info.items(): |
if val['start_index'] > max_index: |
max_index = val['start_index'] |
if val['start_index'] < min_index: |
min_index = val['start_index'] |
return min_index, max_index |
def _apply_sigmoid_time_weighing(self, infos: list[Info], document_len: int, time_steepness: float, time_power: float): |
sigmoid = lambda x: 1 / (1 + np.exp(-x)) |
weights = sigmoid(time_steepness * np.linspace(-10, 10, document_len)) |
weights = weights - min(weights) |
weights = weights * (time_power / max(weights)) |
weights = weights + (1 - time_power) |
weights = weights[::-1] |
for info in infos: |
index = info.start_index |
info.distance *= weights[index] |
def _filter_outliers_by_median_distance(self, infos: list[Info], significant_level: float): |
if not infos: |
return [] |
min_info = min(infos, key=lambda x: x.distance) |
median_distance = np.median([inf.distance for inf in infos]) |
filtered_infos = [inf for inf in infos if inf.distance <= significant_level * median_distance] |
if min_info not in filtered_infos: |
filtered_infos.append(min_info) |
return filtered_infos |
def _merge_infos(self, infos: list[Info]): |
merged_infos = [] |
current_info = infos[0] |
for next_info in infos[1:]: |
merged = current_info.merge_with(next_info) |
if merged is not None: |
current_info = merged |
else: |
merged_infos.append(current_info) |
current_info = next_info |
merged_infos.append(current_info) |
return merged_infos |
def _get_documents_ids_distances(self, search_strings: list[str], n_results: int): |
n_results = min(len(self.ids), n_results) |
if n_results == 0: |
return [], [], [] |
if isinstance(search_strings, str): |
search_strings = [search_strings] |
infos = [] |
min_start_index, max_start_index = self._find_min_max_start_index() |
for search_string in search_strings: |
result = self.collection.query(query_texts=search_string, n_results=math.ceil(n_results / len(search_strings)), include=['distances']) |
curr_infos = [Info(start_index=self.id_to_info[id]['start_index'], |
text_with_context=self.id_to_info[id]['text_with_context'], |
distance=distance, id=id) |
for id, distance in zip(result['ids'][0], result['distances'][0])] |
self._apply_sigmoid_time_weighing(infos=curr_infos, document_len=max_start_index - min_start_index + 1, time_steepness=parameters.get_time_steepness(), time_power=parameters.get_time_power()) |
curr_infos = self._filter_outliers_by_median_distance(curr_infos, parameters.get_significant_level()) |
infos.extend(curr_infos) |
infos.sort(key=lambda x: x.start_index) |
infos = self._merge_infos(infos) |
texts_with_context = [inf.text_with_context for inf in infos] |
ids = [inf.id for inf in infos] |
distances = [inf.distance for inf in infos] |
return texts_with_context, ids, distances |
def get(self, search_strings: list[str], n_results: int) -> list[str]: |
with self.lock: |
documents, _, _ = self._get_documents_ids_distances(search_strings, n_results) |
return documents |
def get_ids(self, search_strings: list[str], n_results: int) -> list[str]: |
with self.lock: |
_, ids, _ = self._get_documents_ids_distances(search_strings, n_results) |
return ids |
def _get_documents_up_to_token_count(self, documents: list[str], max_token_count: int): |
current_token_count = 0 |
return_documents = [] |
for doc in documents: |
doc_tokens = encode(doc)[0] |
doc_token_count = len(doc_tokens) |
if current_token_count + doc_token_count > max_token_count: |
remaining_tokens = max_token_count - current_token_count |
truncated_doc = decode(doc_tokens[:remaining_tokens], skip_special_tokens=True) |
return_documents.append(truncated_doc) |
break |
else: |
return_documents.append(doc) |
current_token_count += doc_token_count |
return return_documents |
def get_sorted_by_ids(self, search_strings: list[str], n_results: int, max_token_count: int) -> list[str]: |
with self.lock: |
documents, ids, _ = self._get_documents_ids_distances(search_strings, n_results) |
sorted_docs = [x for _, x in sorted(zip(ids, documents))] |
return self._get_documents_up_to_token_count(sorted_docs, max_token_count) |
def get_sorted_by_dist(self, search_strings: list[str], n_results: int, max_token_count: int) -> list[str]: |
with self.lock: |
documents, _, distances = self._get_documents_ids_distances(search_strings, n_results) |
sorted_docs = [doc for doc, _ in sorted(zip(documents, distances), key=lambda x: x[1])] |
return_documents = self._get_documents_up_to_token_count(sorted_docs, max_token_count) |
return_documents.reverse() |
return return_documents |
def delete(self, ids_to_delete: list[str], where: dict): |
with self.lock: |
ids_to_delete = self.collection.get(ids=ids_to_delete, where=where)['ids'] |
self.collection.delete(ids=ids_to_delete, where=where) |
ids_set = set(ids_to_delete) |
self.ids = [id_ for id_ in self.ids if id_ not in ids_set] |
for id_ in ids_to_delete: |
self.id_to_info.pop(id_, None) |
logger.info(f'Successfully deleted {len(ids_to_delete)} records from chromaDB.') |
def clear(self): |
with self.lock: |
self.chroma_client.reset() |
self.collection = self.chroma_client.create_collection("context", embedding_function=self.embedder.embed) |
self.ids = [] |
self.id_to_info = {} |
logger.info('Successfully cleared all records and reset chromaDB.') |
class SentenceTransformerEmbedder(Embedder): |
def __init__(self) -> None: |
logger.debug('Creating Sentence Embedder...') |
self.model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") |
self.embed = self.model.encode |
def make_collector(): |
return ChromaCollector(SentenceTransformerEmbedder()) |