NLL_Interface / util /Embeddings.py
bytedancerneat's picture
Upload folder using huggingface_hub
929938f verified
import os
from copy import copy
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import time
from functools import wraps
os.environ['CURL_CA_BUNDLE'] = ''
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
class BaseEmbeddings:
"""
Base class for embeddings
"""
def __init__(self, path: str, is_api: bool) -> None:
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> List[float]:
raise NotImplementedError
@classmethod
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
"""
calculate cosine similarity between two vectors
"""
dot_product = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot_product / magnitude
class OpenAIEmbedding(BaseEmbeddings):
"""
class for OpenAI embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
from openai import OpenAI
self.client = OpenAI()
self.client.api_key = os.getenv("OPENAI_API_KEY")
self.client.base_url = os.getenv("OPENAI_BASE_URL")
def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
if self.is_api:
text = text.replace("\n", " ")
return self.client.embeddings.create(input=[text], model=model).data[0].embedding
else:
raise NotImplementedError
class JinaEmbedding(BaseEmbeddings):
"""
class for Jina embeddings
"""
def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model = self.load_model()
def get_embedding(self, text: str) -> List[float]:
return self._model.encode([text])[0].tolist()
def load_model(self):
import torch
from transformers import AutoModel
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device)
return model
class ZhipuEmbedding(BaseEmbeddings):
"""
class for Zhipu embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
from zhipuai import ZhipuAI
self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY"))
def get_embedding(self, text: str) -> List[float]:
response = self.client.embeddings.create(
model="embedding-2",
input=text,
)
return response.data[0].embedding
class DashscopeEmbedding(BaseEmbeddings):
"""
class for Dashscope embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
import dashscope
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
self.client = dashscope.TextEmbedding
def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]:
response = self.client.call(
model=model,
input=text
)
return response.output['embeddings'][0]['embedding']
class BgeEmbedding(BaseEmbeddings):
"""
class for BGE embeddings
"""
def __init__(self, path: str = 'BAAI/bge-en-icl', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model, self._tokenizer = self.load_model(path)
def get_embedding(self, text: str) -> List[float]:
import torch
encoded_input = self._tokenizer([text], padding=True, truncation=True, return_tensors='pt')
encoded_input = {k: v.to(self._model.device) for k, v in encoded_input.items()}
with torch.no_grad():
model_output = self._model(**encoded_input)
sentence_embeddings = model_output[0][:, 0]
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings[0].tolist()
def load_model(self, path: str):
import torch
from transformers import AutoModel, AutoTokenizer
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
tokenizer = AutoTokenizer.from_pretrained(path)
model = AutoModel.from_pretrained(path).to(device)
model.eval()
return model, tokenizer
def rate_limiter():
def rate_limiter_decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
max_calls_per_minute = self.max_qpm
interval = 60 / max_calls_per_minute
current_time = time.time()
# Check if there's a record of the last call, if not set it to 0
if not hasattr(self, '_last_called'):
self._last_called = 0
elapsed_time = current_time - self._last_called
if elapsed_time < interval:
time_to_wait = interval - elapsed_time
if self.silent is False:
print(f"## Rate limit reached. Waiting for {time_to_wait:.2f} seconds.")
time.sleep(time_to_wait)
result = func(self, *args, **kwargs)
self._last_called = time.time()
return result
return wrapper
return rate_limiter_decorator
class TextEmb3LargeEmbedding(BaseEmbeddings):
"""
class for text-embedding-3-large embeddings
"""
def __init__(self, max_qpm, is_silent=False):
from langchain_openai import AzureOpenAIEmbeddings
## https://gpt.bytedance.net/gpt_openapi/
base_url = "https://search-va.byteintl.net/gpt/openapi/online/v2/crawl"
api_version = "2024-03-01-preview"
ak = "5dXdIKxZc8JWVVgvX0DN92HWIYb9NfEb_GPT_AK"
model_name = "text-embedding-3-large"
api_type = "azure"
self.llm = AzureOpenAIEmbeddings(
azure_endpoint=base_url,
openai_api_version=api_version,
deployment=model_name,
openai_api_key=ak,
openai_api_type=api_type,
)
self.max_qpm = max_qpm
self.silent = is_silent
@rate_limiter()
def get_embedding(self, text: str):
return self.llm.embed_query(text)