Spaces:
Sleeping
Sleeping
File size: 6,830 Bytes
929938f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import os
from copy import copy
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import time
from functools import wraps
os.environ['CURL_CA_BUNDLE'] = ''
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
class BaseEmbeddings:
"""
Base class for embeddings
"""
def __init__(self, path: str, is_api: bool) -> None:
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> List[float]:
raise NotImplementedError
@classmethod
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
"""
calculate cosine similarity between two vectors
"""
dot_product = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot_product / magnitude
class OpenAIEmbedding(BaseEmbeddings):
"""
class for OpenAI embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
from openai import OpenAI
self.client = OpenAI()
self.client.api_key = os.getenv("OPENAI_API_KEY")
self.client.base_url = os.getenv("OPENAI_BASE_URL")
def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
if self.is_api:
text = text.replace("\n", " ")
return self.client.embeddings.create(input=[text], model=model).data[0].embedding
else:
raise NotImplementedError
class JinaEmbedding(BaseEmbeddings):
"""
class for Jina embeddings
"""
def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model = self.load_model()
def get_embedding(self, text: str) -> List[float]:
return self._model.encode([text])[0].tolist()
def load_model(self):
import torch
from transformers import AutoModel
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device)
return model
class ZhipuEmbedding(BaseEmbeddings):
"""
class for Zhipu embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
from zhipuai import ZhipuAI
self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY"))
def get_embedding(self, text: str) -> List[float]:
response = self.client.embeddings.create(
model="embedding-2",
input=text,
)
return response.data[0].embedding
class DashscopeEmbedding(BaseEmbeddings):
"""
class for Dashscope embeddings
"""
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
import dashscope
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
self.client = dashscope.TextEmbedding
def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]:
response = self.client.call(
model=model,
input=text
)
return response.output['embeddings'][0]['embedding']
class BgeEmbedding(BaseEmbeddings):
"""
class for BGE embeddings
"""
def __init__(self, path: str = 'BAAI/bge-en-icl', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model, self._tokenizer = self.load_model(path)
def get_embedding(self, text: str) -> List[float]:
import torch
encoded_input = self._tokenizer([text], padding=True, truncation=True, return_tensors='pt')
encoded_input = {k: v.to(self._model.device) for k, v in encoded_input.items()}
with torch.no_grad():
model_output = self._model(**encoded_input)
sentence_embeddings = model_output[0][:, 0]
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings[0].tolist()
def load_model(self, path: str):
import torch
from transformers import AutoModel, AutoTokenizer
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
tokenizer = AutoTokenizer.from_pretrained(path)
model = AutoModel.from_pretrained(path).to(device)
model.eval()
return model, tokenizer
def rate_limiter():
def rate_limiter_decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
max_calls_per_minute = self.max_qpm
interval = 60 / max_calls_per_minute
current_time = time.time()
# Check if there's a record of the last call, if not set it to 0
if not hasattr(self, '_last_called'):
self._last_called = 0
elapsed_time = current_time - self._last_called
if elapsed_time < interval:
time_to_wait = interval - elapsed_time
if self.silent is False:
print(f"## Rate limit reached. Waiting for {time_to_wait:.2f} seconds.")
time.sleep(time_to_wait)
result = func(self, *args, **kwargs)
self._last_called = time.time()
return result
return wrapper
return rate_limiter_decorator
class TextEmb3LargeEmbedding(BaseEmbeddings):
"""
class for text-embedding-3-large embeddings
"""
def __init__(self, max_qpm, is_silent=False):
from langchain_openai import AzureOpenAIEmbeddings
## https://gpt.bytedance.net/gpt_openapi/
base_url = "https://search-va.byteintl.net/gpt/openapi/online/v2/crawl"
api_version = "2024-03-01-preview"
ak = "5dXdIKxZc8JWVVgvX0DN92HWIYb9NfEb_GPT_AK"
model_name = "text-embedding-3-large"
api_type = "azure"
self.llm = AzureOpenAIEmbeddings(
azure_endpoint=base_url,
openai_api_version=api_version,
deployment=model_name,
openai_api_key=ak,
openai_api_type=api_type,
)
self.max_qpm = max_qpm
self.silent = is_silent
@rate_limiter()
def get_embedding(self, text: str):
return self.llm.embed_query(text) |