|
import os |
|
|
|
import numpy as np |
|
from extensions.openai.errors import ServiceUnavailableError |
|
from extensions.openai.utils import debug_msg, float_list_to_base64 |
|
from transformers import AutoModel |
|
|
|
from modules import shared |
|
|
|
embeddings_params_initialized = False |
|
|
|
|
|
def initialize_embedding_params(): |
|
''' |
|
using 'lazy loading' to avoid circular import |
|
so this function will be executed only once |
|
''' |
|
global embeddings_params_initialized |
|
if not embeddings_params_initialized: |
|
global st_model, embeddings_model, embeddings_device |
|
from extensions.openai.script import params |
|
st_model = os.environ.get("OPENEDAI_EMBEDDING_MODEL", params.get('embedding_model', 'all-mpnet-base-v2')) |
|
embeddings_model = None |
|
|
|
embeddings_device = os.environ.get("OPENEDAI_EMBEDDING_DEVICE", params.get('embedding_device', 'cpu')) |
|
if embeddings_device.lower() == 'auto': |
|
embeddings_device = None |
|
embeddings_params_initialized = True |
|
|
|
|
|
def load_embedding_model(model: str): |
|
initialize_embedding_params() |
|
global embeddings_device, embeddings_model |
|
try: |
|
print(f"Try embedding model: {model} on {embeddings_device}") |
|
trust = shared.args.trust_remote_code |
|
if embeddings_device == 'cpu': |
|
embeddings_model = AutoModel.from_pretrained(model, trust_remote_code=trust).to("cpu", dtype=float) |
|
else: |
|
embeddings_model = AutoModel.from_pretrained(model, trust_remote_code=trust) |
|
print(f"\nLoaded embedding model: {model} on {embeddings_model.device}") |
|
except Exception as e: |
|
embeddings_model = None |
|
raise ServiceUnavailableError(f"Error: Failed to load embedding model: {model}", internal_message=repr(e)) |
|
|
|
|
|
def get_embeddings_model() -> AutoModel: |
|
initialize_embedding_params() |
|
global embeddings_model, st_model |
|
if st_model and not embeddings_model: |
|
load_embedding_model(st_model) |
|
return embeddings_model |
|
|
|
|
|
def get_embeddings_model_name() -> str: |
|
initialize_embedding_params() |
|
global st_model |
|
return st_model |
|
|
|
|
|
def get_embeddings(input: list) -> np.ndarray: |
|
model = get_embeddings_model() |
|
debug_msg(f"embedding model : {model}") |
|
embedding = model.encode(input, convert_to_numpy=True, normalize_embeddings=True, convert_to_tensor=False) |
|
debug_msg(f"embedding result : {embedding}") |
|
return embedding |
|
|
|
|
|
def embeddings(input: list, encoding_format: str) -> dict: |
|
|
|
embeddings = get_embeddings(input) |
|
|
|
if encoding_format == "base64": |
|
data = [{"object": "embedding", "embedding": float_list_to_base64(emb), "index": n} for n, emb in enumerate(embeddings)] |
|
else: |
|
data = [{"object": "embedding", "embedding": emb.tolist(), "index": n} for n, emb in enumerate(embeddings)] |
|
|
|
response = { |
|
"object": "list", |
|
"data": data, |
|
"model": st_model, |
|
"usage": { |
|
"prompt_tokens": 0, |
|
"total_tokens": 0, |
|
} |
|
} |
|
|
|
debug_msg(f"Embeddings return size: {len(embeddings[0])}, number: {len(embeddings)}") |
|
|
|
return response |
|
|