Spaces:
Sleeping
Sleeping
| import os | |
| from copy import copy | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import numpy as np | |
| import time | |
| from functools import wraps | |
| os.environ['CURL_CA_BUNDLE'] = '' | |
| from dotenv import load_dotenv, find_dotenv | |
| _ = load_dotenv(find_dotenv()) | |
| class BaseEmbeddings: | |
| """ | |
| Base class for embeddings | |
| """ | |
| def __init__(self, path: str, is_api: bool) -> None: | |
| self.path = path | |
| self.is_api = is_api | |
| def get_embedding(self, text: str, model: str) -> List[float]: | |
| raise NotImplementedError | |
| def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float: | |
| """ | |
| calculate cosine similarity between two vectors | |
| """ | |
| dot_product = np.dot(vector1, vector2) | |
| magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2) | |
| if not magnitude: | |
| return 0 | |
| return dot_product / magnitude | |
| class OpenAIEmbedding(BaseEmbeddings): | |
| """ | |
| class for OpenAI embeddings | |
| """ | |
| def __init__(self, path: str = '', is_api: bool = True) -> None: | |
| super().__init__(path, is_api) | |
| if self.is_api: | |
| from openai import OpenAI | |
| self.client = OpenAI() | |
| self.client.api_key = os.getenv("OPENAI_API_KEY") | |
| self.client.base_url = os.getenv("OPENAI_BASE_URL") | |
| def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]: | |
| if self.is_api: | |
| text = text.replace("\n", " ") | |
| return self.client.embeddings.create(input=[text], model=model).data[0].embedding | |
| else: | |
| raise NotImplementedError | |
| class JinaEmbedding(BaseEmbeddings): | |
| """ | |
| class for Jina embeddings | |
| """ | |
| def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None: | |
| super().__init__(path, is_api) | |
| self._model = self.load_model() | |
| def get_embedding(self, text: str) -> List[float]: | |
| return self._model.encode([text])[0].tolist() | |
| def load_model(self): | |
| import torch | |
| from transformers import AutoModel | |
| if torch.cuda.is_available(): | |
| device = torch.device("cuda") | |
| else: | |
| device = torch.device("cpu") | |
| model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device) | |
| return model | |
| class ZhipuEmbedding(BaseEmbeddings): | |
| """ | |
| class for Zhipu embeddings | |
| """ | |
| def __init__(self, path: str = '', is_api: bool = True) -> None: | |
| super().__init__(path, is_api) | |
| if self.is_api: | |
| from zhipuai import ZhipuAI | |
| self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY")) | |
| def get_embedding(self, text: str) -> List[float]: | |
| response = self.client.embeddings.create( | |
| model="embedding-2", | |
| input=text, | |
| ) | |
| return response.data[0].embedding | |
| class DashscopeEmbedding(BaseEmbeddings): | |
| """ | |
| class for Dashscope embeddings | |
| """ | |
| def __init__(self, path: str = '', is_api: bool = True) -> None: | |
| super().__init__(path, is_api) | |
| if self.is_api: | |
| import dashscope | |
| dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") | |
| self.client = dashscope.TextEmbedding | |
| def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]: | |
| response = self.client.call( | |
| model=model, | |
| input=text | |
| ) | |
| return response.output['embeddings'][0]['embedding'] | |
| class BgeEmbedding(BaseEmbeddings): | |
| """ | |
| class for BGE embeddings | |
| """ | |
| def __init__(self, path: str = 'BAAI/bge-en-icl', is_api: bool = False) -> None: | |
| super().__init__(path, is_api) | |
| self._model, self._tokenizer = self.load_model(path) | |
| def get_embedding(self, text: str) -> List[float]: | |
| import torch | |
| encoded_input = self._tokenizer([text], padding=True, truncation=True, return_tensors='pt') | |
| encoded_input = {k: v.to(self._model.device) for k, v in encoded_input.items()} | |
| with torch.no_grad(): | |
| model_output = self._model(**encoded_input) | |
| sentence_embeddings = model_output[0][:, 0] | |
| sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1) | |
| return sentence_embeddings[0].tolist() | |
| def load_model(self, path: str): | |
| import torch | |
| from transformers import AutoModel, AutoTokenizer | |
| if torch.cuda.is_available(): | |
| device = torch.device("cuda") | |
| else: | |
| device = torch.device("cpu") | |
| tokenizer = AutoTokenizer.from_pretrained(path) | |
| model = AutoModel.from_pretrained(path).to(device) | |
| model.eval() | |
| return model, tokenizer | |
| def rate_limiter(): | |
| def rate_limiter_decorator(func): | |
| def wrapper(self, *args, **kwargs): | |
| max_calls_per_minute = self.max_qpm | |
| interval = 60 / max_calls_per_minute | |
| current_time = time.time() | |
| # Check if there's a record of the last call, if not set it to 0 | |
| if not hasattr(self, '_last_called'): | |
| self._last_called = 0 | |
| elapsed_time = current_time - self._last_called | |
| if elapsed_time < interval: | |
| time_to_wait = interval - elapsed_time | |
| if self.silent is False: | |
| print(f"## Rate limit reached. Waiting for {time_to_wait:.2f} seconds.") | |
| time.sleep(time_to_wait) | |
| result = func(self, *args, **kwargs) | |
| self._last_called = time.time() | |
| return result | |
| return wrapper | |
| return rate_limiter_decorator | |
| class TextEmb3LargeEmbedding(BaseEmbeddings): | |
| """ | |
| class for text-embedding-3-large embeddings | |
| """ | |
| def __init__(self, max_qpm, is_silent=False): | |
| from langchain_openai import AzureOpenAIEmbeddings | |
| ## https://gpt.bytedance.net/gpt_openapi/ | |
| base_url = "https://search-va.byteintl.net/gpt/openapi/online/v2/crawl" | |
| api_version = "2024-03-01-preview" | |
| ak = "5dXdIKxZc8JWVVgvX0DN92HWIYb9NfEb_GPT_AK" | |
| model_name = "text-embedding-3-large" | |
| api_type = "azure" | |
| self.llm = AzureOpenAIEmbeddings( | |
| azure_endpoint=base_url, | |
| openai_api_version=api_version, | |
| deployment=model_name, | |
| openai_api_key=ak, | |
| openai_api_type=api_type, | |
| ) | |
| self.max_qpm = max_qpm | |
| self.silent = is_silent | |
| def get_embedding(self, text: str): | |
| return self.llm.embed_query(text) |