import os from copy import copy from typing import Dict, List, Optional, Tuple, Union import numpy as np import time from functools import wraps os.environ['CURL_CA_BUNDLE'] = '' from dotenv import load_dotenv, find_dotenv _ = load_dotenv(find_dotenv()) class BaseEmbeddings: """ Base class for embeddings """ def __init__(self, path: str, is_api: bool) -> None: self.path = path self.is_api = is_api def get_embedding(self, text: str, model: str) -> List[float]: raise NotImplementedError @classmethod def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float: """ calculate cosine similarity between two vectors """ dot_product = np.dot(vector1, vector2) magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2) if not magnitude: return 0 return dot_product / magnitude class OpenAIEmbedding(BaseEmbeddings): """ class for OpenAI embeddings """ def __init__(self, path: str = '', is_api: bool = True) -> None: super().__init__(path, is_api) if self.is_api: from openai import OpenAI self.client = OpenAI() self.client.api_key = os.getenv("OPENAI_API_KEY") self.client.base_url = os.getenv("OPENAI_BASE_URL") def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]: if self.is_api: text = text.replace("\n", " ") return self.client.embeddings.create(input=[text], model=model).data[0].embedding else: raise NotImplementedError class JinaEmbedding(BaseEmbeddings): """ class for Jina embeddings """ def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None: super().__init__(path, is_api) self._model = self.load_model() def get_embedding(self, text: str) -> List[float]: return self._model.encode([text])[0].tolist() def load_model(self): import torch from transformers import AutoModel if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device) return model class ZhipuEmbedding(BaseEmbeddings): """ class for Zhipu embeddings """ def __init__(self, path: str = '', is_api: bool = True) -> None: super().__init__(path, is_api) if self.is_api: from zhipuai import ZhipuAI self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY")) def get_embedding(self, text: str) -> List[float]: response = self.client.embeddings.create( model="embedding-2", input=text, ) return response.data[0].embedding class DashscopeEmbedding(BaseEmbeddings): """ class for Dashscope embeddings """ def __init__(self, path: str = '', is_api: bool = True) -> None: super().__init__(path, is_api) if self.is_api: import dashscope dashscope.api_key = os.getenv("DASHSCOPE_API_KEY") self.client = dashscope.TextEmbedding def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]: response = self.client.call( model=model, input=text ) return response.output['embeddings'][0]['embedding'] class BgeEmbedding(BaseEmbeddings): """ class for BGE embeddings """ def __init__(self, path: str = 'BAAI/bge-en-icl', is_api: bool = False) -> None: super().__init__(path, is_api) self._model, self._tokenizer = self.load_model(path) def get_embedding(self, text: str) -> List[float]: import torch encoded_input = self._tokenizer([text], padding=True, truncation=True, return_tensors='pt') encoded_input = {k: v.to(self._model.device) for k, v in encoded_input.items()} with torch.no_grad(): model_output = self._model(**encoded_input) sentence_embeddings = model_output[0][:, 0] sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1) return sentence_embeddings[0].tolist() def load_model(self, path: str): import torch from transformers import AutoModel, AutoTokenizer if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") tokenizer = AutoTokenizer.from_pretrained(path) model = AutoModel.from_pretrained(path).to(device) model.eval() return model, tokenizer def rate_limiter(): def rate_limiter_decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): max_calls_per_minute = self.max_qpm interval = 60 / max_calls_per_minute current_time = time.time() # Check if there's a record of the last call, if not set it to 0 if not hasattr(self, '_last_called'): self._last_called = 0 elapsed_time = current_time - self._last_called if elapsed_time < interval: time_to_wait = interval - elapsed_time if self.silent is False: print(f"## Rate limit reached. Waiting for {time_to_wait:.2f} seconds.") time.sleep(time_to_wait) result = func(self, *args, **kwargs) self._last_called = time.time() return result return wrapper return rate_limiter_decorator class TextEmb3LargeEmbedding(BaseEmbeddings): """ class for text-embedding-3-large embeddings """ def __init__(self, max_qpm, is_silent=False): from langchain_openai import AzureOpenAIEmbeddings ## https://gpt.bytedance.net/gpt_openapi/ base_url = "https://search-va.byteintl.net/gpt/openapi/online/v2/crawl" api_version = "2024-03-01-preview" ak = "5dXdIKxZc8JWVVgvX0DN92HWIYb9NfEb_GPT_AK" model_name = "text-embedding-3-large" api_type = "azure" self.llm = AzureOpenAIEmbeddings( azure_endpoint=base_url, openai_api_version=api_version, deployment=model_name, openai_api_key=ak, openai_api_type=api_type, ) self.max_qpm = max_qpm self.silent = is_silent @rate_limiter() def get_embedding(self, text: str): return self.llm.embed_query(text)