from sentence_transformers import SentenceTransformer, util from PIL import Image import io import pickle import os import time import numpy as np import pprint metadata_path = 'conceptarium/metadata.pickle' def init(): if not os.path.exists(metadata_path): os.mkdir('conceptarium') pickle.dump(list(), open(metadata_path, 'wb')) def save(thought): conceptarium = pickle.load(open(metadata_path, 'rb')) if len(conceptarium) > 0: modality_match = [e.modality == thought.modality for e in conceptarium] corpus_embeddings = [e.embedding for e in conceptarium] results = util.semantic_search( [thought.embedding], corpus_embeddings, top_k=len(corpus_embeddings), score_function=util.dot_score)[0] results = [e if modality_match[e['corpus_id']] else compensate_modality_mismatch(e) for e in results] for result in results: conceptarium[result['corpus_id'] ].interest += result['score'] if len(list(filter(lambda x: open(x.filename, 'rb').read() == open(thought.filename, 'rb').read(), conceptarium))) == 0: conceptarium += [thought] pickle.dump(conceptarium, open(metadata_path, 'wb')) def find(query, model, relatedness, serendipity, noise, silent, top_k): conceptarium = pickle.load(open(metadata_path, 'rb')) query_embedding = embed(query, model) query_modality = get_modality(query) modality_match = [e.modality == query_modality for e in conceptarium] corpus_embeddings = [e.embedding for e in conceptarium] results = util.semantic_search( [query_embedding], corpus_embeddings, top_k=len(corpus_embeddings), score_function=util.dot_score)[0] results = [e if modality_match[e['corpus_id']] else compensate_modality_mismatch(e) for e in results] if not silent: for result in results: conceptarium[result['corpus_id'] ].interest += result['score'] pickle.dump(conceptarium, open(metadata_path, 'wb')) for idx, result in enumerate(results): results[idx]['score'] = (relatedness * result['score'] - serendipity * (np.log(conceptarium[result['corpus_id']].interest / (1 - 0.9)) - 0.9 * np.log((time.time() - conceptarium[result['corpus_id']].timestamp) / (3600 * 24) + 0.1))) \ * np.random.normal(1, noise) results = sorted( results, key=lambda result: result['score'], reverse=True) memories = [conceptarium[e['corpus_id']] for e in results][:top_k] return memories def get_doc_paths(directory): paths = [] for root, directories, files in os.walk(directory): for filename in files: path = os.path.join(root, filename) paths.append(path) return paths def load_model(): return SentenceTransformer('clip-ViT-B-32') def embed(content, model): if get_modality(content) == 'language': return model.encode(content, convert_to_tensor=True, normalize_embeddings=True) else: return model.encode(Image.open(io.BytesIO(content)), convert_to_tensor=True, normalize_embeddings=True) def reset_embeddings(model): conceptarium = pickle.load(open(metadata_path, 'rb')) for thought_idx, thought in enumerate(conceptarium): if thought.modality == 'language': content = open(thought.filename, 'r').read() else: content = open(thought.filename, 'rb').read() conceptarium[thought_idx].embedding = embed(content, model) pickle.dump(conceptarium, open(metadata_path, 'wb')) def get_modality(content): if isinstance(content, str): return 'language' else: return 'imagery' def compensate_modality_mismatch(result): result['score'] *= 2.5 return result class Thought: def __init__(self, filename, content, model): self.filename = filename self.modality = get_modality(content) self.timestamp = time.time() self.interest = 1 self.embedding = embed(content, model) def get_content(self): if self.modality == 'language': return open(self.filename).read() elif self.modality == 'imagery': return open(self.filename, 'rb').read() ''' import json thoughts = json.load(open('knowledge/base/metadata.json', 'rb')) from datetime import datetime new_thoughts = [] for thought in thoughts: new_thought = {} new_thought['filename'] = thought.filename new_thought['modality'] = thought.modality new_thought['timestamp'] = thought.timestamp new_thought['interest'] = thought.interest new_thought['embedding'] = thought.embedding new_thoughts += [new_thought] for e_idx, e in enumerate(new_thoughts): if e['modality'] == 'language': new_thoughts[e_idx]['modality'] = 'text' elif e['modality'] == 'imagery': new_thoughts[e_idx]['modality'] = 'image' else: print(e['modality']) for e_idx, e in enumerate(new_thoughts): new_thoughts[e_idx]['embedding'] = e['embedding'].tolist() for e_idx, e in enumerate(new_thoughts): new_thoughts[e_idx]['embedding'] = [round(f, 6) for f in e['embedding']] for e_idx, e in enumerate(new_thoughts): new_thoughts[e_idx]['filename'] = e['filename'].split('/')[-1] new_thoughts[0] json.dump(new_thoughts, open('knowledge/base/metadata.json', 'wb')) '''