Spaces:
Sleeping
Sleeping
import os | |
from copy import copy | |
from typing import Dict, List, Optional, Tuple, Union | |
import numpy as np | |
import time | |
from functools import wraps | |
os.environ['CURL_CA_BUNDLE'] = '' | |
from dotenv import load_dotenv, find_dotenv | |
_ = load_dotenv(find_dotenv()) | |
class BaseEmbeddings: | |
""" | |
Base class for embeddings | |
""" | |
def __init__(self, path: str, is_api: bool) -> None: | |
self.path = path | |
self.is_api = is_api | |
def get_embedding(self, text: str, model: str) -> List[float]: | |
raise NotImplementedError | |
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float: | |
""" | |
calculate cosine similarity between two vectors | |
""" | |
dot_product = np.dot(vector1, vector2) | |
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2) | |
if not magnitude: | |
return 0 | |
return dot_product / magnitude | |
class OpenAIEmbedding(BaseEmbeddings): | |
""" | |
class for OpenAI embeddings | |
""" | |
def __init__(self, path: str = '', is_api: bool = True) -> None: | |
super().__init__(path, is_api) | |
if self.is_api: | |
from openai import OpenAI | |
self.client = OpenAI() | |
self.client.api_key = os.getenv("OPENAI_API_KEY") | |
self.client.base_url = os.getenv("OPENAI_BASE_URL") | |
def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]: | |
if self.is_api: | |
text = text.replace("\n", " ") | |
return self.client.embeddings.create(input=[text], model=model).data[0].embedding | |
else: | |
raise NotImplementedError | |
class JinaEmbedding(BaseEmbeddings): | |
""" | |
class for Jina embeddings | |
""" | |
def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None: | |
super().__init__(path, is_api) | |
self._model = self.load_model() | |
def get_embedding(self, text: str) -> List[float]: | |
return self._model.encode([text])[0].tolist() | |
def load_model(self): | |
import torch | |
from transformers import AutoModel | |
if torch.cuda.is_available(): | |
device = torch.device("cuda") | |
else: | |
device = torch.device("cpu") | |
model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device) | |
return model | |
class ZhipuEmbedding(BaseEmbeddings): | |
""" | |
class for Zhipu embeddings | |
""" | |
def __init__(self, path: str = '', is_api: bool = True) -> None: | |
super().__init__(path, is_api) | |
if self.is_api: | |
from zhipuai import ZhipuAI | |
self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY")) | |
def get_embedding(self, text: str) -> List[float]: | |
response = self.client.embeddings.create( | |
model="embedding-2", | |
input=text, | |
) | |
return response.data[0].embedding | |
class DashscopeEmbedding(BaseEmbeddings): | |
""" | |
class for Dashscope embeddings | |
""" | |
def __init__(self, path: str = '', is_api: bool = True) -> None: | |
super().__init__(path, is_api) | |
if self.is_api: | |
import dashscope | |
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") | |
self.client = dashscope.TextEmbedding | |
def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]: | |
response = self.client.call( | |
model=model, | |
input=text | |
) | |
return response.output['embeddings'][0]['embedding'] | |
class BgeEmbedding(BaseEmbeddings): | |
""" | |
class for BGE embeddings | |
""" | |
def __init__(self, path: str = 'BAAI/bge-en-icl', is_api: bool = False) -> None: | |
super().__init__(path, is_api) | |
self._model, self._tokenizer = self.load_model(path) | |
def get_embedding(self, text: str) -> List[float]: | |
import torch | |
encoded_input = self._tokenizer([text], padding=True, truncation=True, return_tensors='pt') | |
encoded_input = {k: v.to(self._model.device) for k, v in encoded_input.items()} | |
with torch.no_grad(): | |
model_output = self._model(**encoded_input) | |
sentence_embeddings = model_output[0][:, 0] | |
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1) | |
return sentence_embeddings[0].tolist() | |
def load_model(self, path: str): | |
import torch | |
from transformers import AutoModel, AutoTokenizer | |
if torch.cuda.is_available(): | |
device = torch.device("cuda") | |
else: | |
device = torch.device("cpu") | |
tokenizer = AutoTokenizer.from_pretrained(path) | |
model = AutoModel.from_pretrained(path).to(device) | |
model.eval() | |
return model, tokenizer | |
def rate_limiter(): | |
def rate_limiter_decorator(func): | |
def wrapper(self, *args, **kwargs): | |
max_calls_per_minute = self.max_qpm | |
interval = 60 / max_calls_per_minute | |
current_time = time.time() | |
# Check if there's a record of the last call, if not set it to 0 | |
if not hasattr(self, '_last_called'): | |
self._last_called = 0 | |
elapsed_time = current_time - self._last_called | |
if elapsed_time < interval: | |
time_to_wait = interval - elapsed_time | |
if self.silent is False: | |
print(f"## Rate limit reached. Waiting for {time_to_wait:.2f} seconds.") | |
time.sleep(time_to_wait) | |
result = func(self, *args, **kwargs) | |
self._last_called = time.time() | |
return result | |
return wrapper | |
return rate_limiter_decorator | |
class TextEmb3LargeEmbedding(BaseEmbeddings): | |
""" | |
class for text-embedding-3-large embeddings | |
""" | |
def __init__(self, max_qpm, is_silent=False): | |
from langchain_openai import AzureOpenAIEmbeddings | |
## https://gpt.bytedance.net/gpt_openapi/ | |
base_url = "https://search-va.byteintl.net/gpt/openapi/online/v2/crawl" | |
api_version = "2024-03-01-preview" | |
ak = "5dXdIKxZc8JWVVgvX0DN92HWIYb9NfEb_GPT_AK" | |
model_name = "text-embedding-3-large" | |
api_type = "azure" | |
self.llm = AzureOpenAIEmbeddings( | |
azure_endpoint=base_url, | |
openai_api_version=api_version, | |
deployment=model_name, | |
openai_api_key=ak, | |
openai_api_type=api_type, | |
) | |
self.max_qpm = max_qpm | |
self.silent = is_silent | |
def get_embedding(self, text: str): | |
return self.llm.embed_query(text) |