|
import gc |
|
import uuid |
|
|
|
import chromadb |
|
import numpy as np |
|
import torch |
|
import torch.nn.functional as F |
|
from PIL import Image |
|
from transformers import AutoModel, AutoImageProcessor |
|
|
|
from src.utils.utils import extract_images_from_file |
|
|
|
|
|
|
|
|
|
|
|
|
|
class is_conf_image: |
|
def __init__(self): |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
self.feature_extractor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
|
cache_dir="../weights", use_fast=True, |
|
trust_remote_code=True) |
|
self.model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", |
|
cache_dir="../weights", trust_remote_code=True).eval().to(self.device) |
|
|
|
self.client = chromadb.PersistentClient(path="../db/image") |
|
self.collection = self.client.get_or_create_collection(name="image_embedding", metadata={"hnsw": "cosine"}, ) |
|
self.max_size: int = 800 |
|
self.cnt: int = 0 |
|
self.cnt_infer: int = 0 |
|
|
|
async def making_embedding_vector(self, image_path: str, category: int, ): |
|
image = Image.open(image_path).convert("RGB") |
|
image = np.array(image) |
|
|
|
embedding_vector = self.inference(image) |
|
|
|
|
|
self.add_vectors(embedding_vector, {"image": image_path, "category": category}) |
|
|
|
if (self.cnt + 1) % 200 == 0: |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
self.cnt += 1 |
|
else: |
|
self.cnt += 1 |
|
return embedding_vector |
|
|
|
async def infer_image(self, image_path: str, threshold: float = 0.45, top_k: int = 2): |
|
image = Image.open(image_path).convert("RGB") |
|
image = np.array(image) |
|
|
|
if image.shape[0] > self.max_size or image.shape[1] > self.max_size or image_path.endswith('.pdf'): |
|
results = [] |
|
for image in extract_images_from_file(image_path, max_size=self.max_size): |
|
image = Image.open(image).convert("RGB") |
|
image = np.array(image) |
|
embedding_vector = self.inference(image) |
|
result = self.finding_from_db(embedding_vector, threshold, top_k) |
|
results.append(result) |
|
return results |
|
|
|
embedding_vector = self.inference(image) |
|
results = self.finding_from_db(embedding_vector, threshold, top_k) |
|
|
|
|
|
|
|
if (self.cnt_infer + 1) % 200 == 0: |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
self.cnt_infer += 1 |
|
else: |
|
self.cnt_infer += 1 |
|
return results |
|
|
|
def finding_from_db(self, embedding_vector, threshold: float, top_k: int, ) -> dict: |
|
result_out, idx = {}, 0 |
|
|
|
results = self.collection.query(query_embeddings=embedding_vector, n_results=top_k, |
|
include=["embeddings", "metadatas", "distances"]) |
|
|
|
|
|
for j in range(len(results["distances"][0])): |
|
if results["distances"][0][j] <= threshold: |
|
result_out["similar_image" + str(idx)] = results["metadatas"][0][j]["image"] |
|
result_out["category" + str(idx)] = results["metadatas"][0][j]["category"] |
|
result_out["cosine distance" + str(idx)] = results["distances"][0][j] |
|
return result_out |
|
|
|
@torch.inference_mode() |
|
def inference(self, image: np.array): |
|
inputs = self.feature_extractor(images=image, return_tensors="pt").to(self.device) |
|
outputs = self.model(**inputs).last_hidden_state |
|
outputs = F.normalize(outputs[:, 0], p=2, dim=1).detach().cpu().numpy() |
|
|
|
return outputs.tolist() |
|
|
|
def add_vectors(self, vectors, metadatas): |
|
|
|
self.collection.add( |
|
embeddings=vectors[0], |
|
metadatas=metadatas, |
|
ids=str(uuid.uuid4()) |
|
) |
|
|