Spaces:
Sleeping
Sleeping
File size: 2,848 Bytes
14dc68f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
from typing import Dict, List
import importlib
import openai
import pinecone
import re
def can_import(module_name):
try:
importlib.import_module(module_name)
return True
except ImportError:
return False
assert (
can_import("pinecone")
), "\033[91m\033[1m"+"Pinecone storage requires package pinecone-client.\nInstall: pip install -r extensions/requirements.txt"
class PineconeResultsStorage:
def __init__(self, openai_api_key: str, pinecone_api_key: str, pinecone_environment: str, llm_model: str, llama_model_path: str, results_store_name: str, objective: str):
openai.api_key = openai_api_key
pinecone.init(api_key=pinecone_api_key, environment=pinecone_environment)
# Pinecone namespaces are only compatible with ascii characters (used in query and upsert)
self.namespace = re.sub(re.compile('[^\x00-\x7F]+'), '', objective)
self.llm_model = llm_model
self.llama_model_path = llama_model_path
results_store_name = results_store_name
dimension = 1536 if not self.llm_model.startswith("llama") else 5120
metric = "cosine"
pod_type = "p1"
if results_store_name not in pinecone.list_indexes():
pinecone.create_index(
results_store_name, dimension=dimension, metric=metric, pod_type=pod_type
)
self.index = pinecone.Index(results_store_name)
index_stats_response = self.index.describe_index_stats()
assert dimension == index_stats_response['dimension'], "Dimension of the index does not match the dimension of the LLM embedding"
def add(self, task: Dict, result: str, result_id: int):
vector = self.get_embedding(
result
)
self.index.upsert(
[(result_id, vector, {"task": task["task_name"], "result": result})], namespace=self.namespace
)
def query(self, query: str, top_results_num: int) -> List[dict]:
query_embedding = self.get_embedding(query)
results = self.index.query(query_embedding, top_k=top_results_num, include_metadata=True, namespace=self.namespace)
sorted_results = sorted(results.matches, key=lambda x: x.score, reverse=True)
return [(str(item.metadata["task"])) for item in sorted_results]
# Get embedding for the text
def get_embedding(self, text: str) -> list:
text = text.replace("\n", " ")
if self.llm_model.startswith("llama"):
from llama_cpp import Llama
llm_embed = Llama(
model_path=self.llama_model_path,
n_ctx=2048, n_threads=4,
embedding=True, use_mlock=True,
)
return llm_embed.embed(text)
return openai.Embedding.create(input=[text], model="text-embedding-ada-002")["data"][0]["embedding"]
|