Spaces:
Sleeping
Sleeping
import numpy as np | |
from sentence_transformers import SentenceTransformer, util | |
from open_clip import create_model_from_pretrained, get_tokenizer | |
import torch | |
from datasets import load_dataset | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch.nn as nn | |
import boto3 | |
import streamlit as st | |
from PIL import Image | |
from PIL import ImageDraw | |
from io import BytesIO | |
import pandas as pd | |
from typing import List, Union | |
import concurrent.futures | |
# Initialize the model globally to avoid reloading each time | |
model, preprocess = create_model_from_pretrained('hf-hub:timm/ViT-SO400M-14-SigLIP-384') | |
tokenizer = get_tokenizer('hf-hub:timm/ViT-SO400M-14-SigLIP-384') | |
#what model do we use? | |
def encode_query(query: Union[str, Image.Image]) -> torch.Tensor: | |
""" | |
Encode the query using the OpenCLIP model. | |
Parameters | |
---------- | |
query : Union[str, Image.Image] | |
The query, which can be a text string or an Image object. | |
Returns | |
------- | |
torch.Tensor | |
The encoded query vector. | |
""" | |
if isinstance(query, Image.Image): | |
query = preprocess(query).unsqueeze(0) # Preprocess the image and add batch dimension | |
with torch.no_grad(): | |
query_embedding = model.encode_image(query) # Get image embedding | |
elif isinstance(query, str): | |
text = tokenizer(query, context_length=model.context_length) | |
with torch.no_grad(): | |
query_embedding = model.encode_text(text) # Get text embedding | |
else: | |
raise ValueError("Query must be either a string or an Image.") | |
return query_embedding | |
def load_dataset_with_limit(dataset_name, dataset_subset, search_in_small_objects,limit=1000): | |
""" | |
Load a dataset from Hugging Face and limit the number of rows. | |
""" | |
if search_in_small_objects: | |
split = f'Splits_{dataset_subset}' | |
else: | |
split = f'Main_{dataset_subset}' | |
dataset_name = f"quasara-io/{dataset_name}" | |
dataset = load_dataset(dataset_name, split=split) | |
total_rows = dataset.num_rows | |
# Convert to DataFrame and sample if limit is provided | |
if limit is not None: | |
df = dataset.to_pandas().sample(n=limit, random_state=42) | |
else: | |
df = dataset.to_pandas() | |
return df,total_rows | |
def get_image_vectors(df): | |
# Get the image vectors from the dataframe | |
image_vectors = np.vstack(df['Vector'].to_numpy()) | |
return torch.tensor(image_vectors, dtype=torch.float32) | |
def search(query, df, limit, search_in_images = True): | |
if search_in_images: | |
# Encode the image query | |
query_vector = encode_query(query) | |
# Get the image vectors from the dataframe | |
image_vectors = get_image_vectors(df) | |
# Calculate the cosine similarity between the query vector and each image vector | |
query_vector = query_vector[0, :].detach().numpy() # Detach and convert to a NumPy array | |
image_vectors = image_vectors.detach().numpy() # Convert the image vectors to a NumPy array | |
cosine_similarities = cosine_similarity([query_vector], image_vectors) | |
# Get the top K indices of the most similar image vectors | |
top_k_indices = np.argsort(-cosine_similarities[0])[:limit] | |
# Return the top K indices | |
return top_k_indices | |
#Try Batch Search | |
def batch_search(query, df, batch_size=100000, limit=10): | |
top_k_indices = [] | |
# Get the image vectors from the dataframe and ensure they are NumPy arrays | |
vectors = get_image_vectors(df).numpy() # Convert to NumPy array if it's a tensor | |
# Encode the query and ensure it's a NumPy array | |
query_vector = encode_query(query)[0].detach().numpy() # Assuming the first element is the query embedding | |
# Iterate over the batches and compute cosine similarities | |
for i in range(0, len(vectors), batch_size): | |
batch_vectors = vectors[i:i + batch_size] # Extract a batch of vectors | |
# Compute cosine similarity between the query vector and the batch | |
batch_similarities = cosine_similarity([query_vector], batch_vectors) | |
# Get the top-k similar vectors within this batch | |
top_k_indices.extend(np.argsort(-batch_similarities[0])[:limit]) | |
return top_k_indices | |
def get_file_paths(df, top_k_indices, column_name = 'File_Path'): | |
""" | |
Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. | |
Parameters: | |
- df: pandas DataFrame containing the data | |
- top_k_indices: numpy array of the top K indices | |
- column_name: str, the name of the column to fetch (e.g., 'ImagePath') | |
Returns: | |
- top_k_paths: list of file paths or values from the specified column | |
""" | |
# Fetch the specific column corresponding to the top K indices | |
top_k_paths = df.iloc[top_k_indices][column_name].tolist() | |
return top_k_paths | |
def get_cordinates(df, top_k_indices, column_name = 'Coordinate'): | |
""" | |
Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. | |
Parameters: | |
- df: pandas DataFrame containing the data | |
- top_k_indices: numpy array of the top K indices | |
- column_name: str, the name of the column to fetch (e.g., 'ImagePath') | |
Returns: | |
- top_k_paths: list of file paths or values from the specified column | |
""" | |
# Fetch the specific column corresponding to the top K indices | |
top_k_paths = df.iloc[top_k_indices][column_name].tolist() | |
return top_k_paths | |
def get_images_from_s3_to_display(bucket_name, file_paths, AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, folder_name): | |
""" | |
Retrieve and display images from AWS S3 in a Streamlit app. | |
Parameters: | |
- bucket_name: str, the name of the S3 bucket | |
- file_paths: list, a list of file paths to retrieve from S3 | |
Returns: | |
- None (directly displays images in the Streamlit app) | |
""" | |
# Initialize S3 client | |
s3 = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
# Iterate over file paths and display each image | |
for file_path in file_paths: | |
# Retrieve the image from S3 | |
s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") | |
img_data = s3_object['Body'].read() | |
# Open the image using PIL and display it using Streamlit | |
img = Image.open(BytesIO(img_data)) | |
st.image(img, caption=file_path, use_column_width=True) | |
def get_images_with_bounding_boxes_from_s3(bucket_name, file_paths, bounding_boxes, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, folder_name): | |
""" | |
Retrieve and display images from AWS S3 with corresponding bounding boxes in a Streamlit app. | |
Parameters: | |
- bucket_name: str, the name of the S3 bucket | |
- file_paths: list, a list of file paths to retrieve from S3 | |
- bounding_boxes: list of numpy arrays or lists, each containing coordinates of bounding boxes (in the form [x_min, y_min, x_max, y_max]) | |
- AWS_ACCESS_KEY_ID: str, AWS access key ID for authentication | |
- AWS_SECRET_ACCESS_KEY: str, AWS secret access key for authentication | |
- folder_name: str, the folder prefix in S3 bucket where the images are stored | |
Returns: | |
- None (directly displays images in the Streamlit app with bounding boxes) | |
""" | |
# Initialize S3 client | |
s3 = boto3.client( | |
's3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
# Iterate over file paths and corresponding bounding boxes | |
for file_path, box_coords in zip(file_paths, bounding_boxes): | |
# Retrieve the image from S3 | |
s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") | |
img_data = s3_object['Body'].read() | |
# Open the image using PIL | |
img = Image.open(BytesIO(img_data)) | |
# Draw bounding boxes on the image | |
draw = ImageDraw.Draw(img) | |
# Ensure box_coords is iterable, in case it's a single numpy array or float value | |
if isinstance(box_coords, (np.ndarray, list)): | |
# Check if we have multiple bounding boxes or a single one | |
if len(box_coords) > 0 and isinstance(box_coords[0], (np.ndarray, list)): | |
# Multiple bounding boxes | |
for box in box_coords: | |
x_min, y_min, x_max, y_max = map(int, box) # Convert to integers | |
draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) | |
else: | |
# Single bounding box | |
x_min, y_min, x_max, y_max = map(int, box_coords) | |
draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) | |
else: | |
raise ValueError(f"Bounding box data for {file_path} is not in an iterable format.") | |
# Display the image with bounding boxes using Streamlit | |
st.image(img, caption=file_path, use_column_width=True) | |