| | import os
|
| | import torch
|
| | from functional import seq
|
| | import numpy as np
|
| | import torch.nn.functional as F
|
| | from torch import cosine_similarity
|
| | from config import Config
|
| | from openai import OpenAI
|
| |
|
| | os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
|
| |
|
| | class TextVector():
|
| | def __init__(self, cfg):
|
| | self.bert_path = cfg.bert_path
|
| |
|
| |
|
| | self.use_api = getattr(cfg, 'use_api', True)
|
| | self.api_key = getattr(cfg, 'api_key', "sk-5b45aa67249a44d38abca3c02cc78a70")
|
| | self.base_url = getattr(cfg, 'base_url', "https://dashscope.aliyuncs.com/compatible-mode/v1")
|
| | self.model_name = getattr(cfg, 'model_name', "text-embedding-v3")
|
| | self.dimensions = getattr(cfg, 'dimensions', 1024)
|
| | self.batch_size = getattr(cfg, 'batch_size', 10)
|
| |
|
| |
|
| | if not self.use_api:
|
| | self.load_model()
|
| |
|
| | def load_model(self):
|
| | """载入模型"""
|
| | self.tokenizer = AutoTokenizer.from_pretrained(self.bert_path)
|
| | self.model = AutoModel.from_pretrained(self.bert_path)
|
| |
|
| | def mean_pooling(self, model_output, attention_mask):
|
| | """采用序列mean-pooling获得句子的表征向量"""
|
| | token_embeddings = model_output[0]
|
| | input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
|
| | return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
|
| |
|
| | def get_vec(self, sentences):
|
| | """通过模型获取句子的向量"""
|
| | if self.use_api:
|
| |
|
| | return self.get_vec_api(sentences)
|
| |
|
| |
|
| | encoded_input = self.tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')
|
| | with torch.no_grad():
|
| | model_output = self.model(**encoded_input)
|
| | sentence_embeddings = self.mean_pooling(model_output, encoded_input['attention_mask'])
|
| | sentence_embeddings = sentence_embeddings.data.cpu().numpy().tolist()
|
| | return sentence_embeddings
|
| |
|
| | def get_vec_api(self, query, batch_size=None):
|
| | """通过API获取句子的向量"""
|
| | if batch_size is None:
|
| | batch_size = self.batch_size
|
| |
|
| |
|
| | if not query:
|
| | print("Warning: Empty query provided to get_vec_api")
|
| | return []
|
| |
|
| | client = OpenAI(
|
| | api_key=self.api_key,
|
| | base_url=self.base_url
|
| | )
|
| |
|
| | if isinstance(query, str):
|
| | query = [query]
|
| |
|
| |
|
| | query = [q for q in query if q and isinstance(q, str) and q.strip()]
|
| | if not query:
|
| | print("Warning: No valid text to vectorize after filtering")
|
| | return []
|
| |
|
| | all_vectors = []
|
| | retry_count = 0
|
| | max_retries = 2
|
| |
|
| | while retry_count <= max_retries and not all_vectors:
|
| | try:
|
| | for i in range(0, len(query), batch_size):
|
| | batch = query[i:i + batch_size]
|
| | try:
|
| | completion = client.embeddings.create(
|
| | model=self.model_name,
|
| | input=batch,
|
| | dimensions=self.dimensions,
|
| | encoding_format="float"
|
| | )
|
| | vectors = [embedding.embedding for embedding in completion.data]
|
| | all_vectors.extend(vectors)
|
| | except Exception as e:
|
| | print(f"向量化批次 {i//batch_size + 1} 失败:{str(e)}")
|
| |
|
| | continue
|
| |
|
| |
|
| | if all_vectors:
|
| | break
|
| | else:
|
| | retry_count += 1
|
| | print(f"未获取到任何向量,第 {retry_count} 次重试...")
|
| |
|
| | except Exception as outer_e:
|
| | print(f"向量化过程中发生错误:{str(outer_e)}")
|
| | retry_count += 1
|
| | if retry_count <= max_retries:
|
| | print(f"第 {retry_count} 次重试...")
|
| |
|
| |
|
| | if not all_vectors and self.dimensions > 0:
|
| | print("Warning: 返回一个空的向量数组,形状为 [0, dimensions]")
|
| | return np.zeros((0, self.dimensions))
|
| |
|
| | return all_vectors
|
| |
|
| | def get_vec_batch(self, data, bs=None):
|
| | """batch方式获取,提高效率"""
|
| | if bs is None:
|
| | bs = self.batch_size
|
| |
|
| | if self.use_api:
|
| |
|
| | vectors = self.get_vec_api(data, bs)
|
| | return torch.tensor(np.array(vectors)) if len(vectors) > 0 else torch.tensor(np.array([]))
|
| |
|
| |
|
| | data = seq(data).grouped(bs)
|
| | all_vectors = []
|
| | for batch in data:
|
| | vecs = self.get_vec(batch)
|
| | all_vectors.extend(vecs)
|
| | all_vectors = torch.tensor(np.array(all_vectors))
|
| | return all_vectors
|
| |
|
| | def vector_similarity(self, vectors):
|
| | """以[query,text1,text2...]来计算query与text1,text2,...的cosine相似度"""
|
| |
|
| | if vectors.size(0) <= 1:
|
| | print("Warning: Not enough vectors for similarity calculation")
|
| | return []
|
| |
|
| | if len(vectors.shape) < 2:
|
| | print("Warning: Vectors must be 2-dimensional")
|
| | return []
|
| |
|
| | vectors = F.normalize(vectors, p=2, dim=1)
|
| | q_vec = vectors[0,:]
|
| | o_vec = vectors[1:,:]
|
| | sim = cosine_similarity(q_vec, o_vec)
|
| | sim = sim.data.cpu().numpy().tolist()
|
| | return sim
|
| |
|
| |
|
| | cfg = Config()
|
| | tv = TextVector(cfg)
|
| | get_vector = tv.get_vec_batch
|
| | get_sim = tv.vector_similarity |