Spaces:
Runtime error
Runtime error
| import os | |
| import gradio as gr | |
| import torch | |
| from FlagEmbedding.visual.modeling import Visualized_BGE | |
| from torchvision import transforms | |
| from PIL import Image | |
| from torch.utils.data import DataLoader | |
| from tqdm import tqdm | |
| from pdf2image import convert_from_path | |
| import numpy as np | |
| import torch.nn.functional as F | |
| import io | |
| # Initialize the Visualized-BGE model | |
| def load_bge_model(model_name: str, model_weight_path: str): | |
| model = Visualized_BGE(model_name_bge=model_name, model_weight=model_weight_path) | |
| model.eval() | |
| return model | |
| # Load the BGE model (ensure you have downloaded the weights and provide the correct path) | |
| model_name = "BAAI/bge-base-en-v1.5" # or "BAAI/bge-m3" for multilingual | |
| model_weight_path ="./Visualized_base_en_v1.5.pth" | |
| model = load_bge_model(model_name, model_weight_path) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| # Function to encode images | |
| import tempfile | |
| import os | |
| def encode_image(image_input): | |
| """ | |
| Encodes an image for retrieval. | |
| Args: | |
| image_input: Can be a file path (str), a NumPy array, or a PIL Image. | |
| Returns: | |
| torch.Tensor: The image embedding. | |
| """ | |
| delete_temp_file = False | |
| if isinstance(image_input, str): | |
| image_path = image_input | |
| else: | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: | |
| if isinstance(image_input, np.ndarray): | |
| image = Image.fromarray(image_input) | |
| elif isinstance(image_input, Image.Image): | |
| image = image_input | |
| else: | |
| raise ValueError("Unsupported image input type for image encoding.") | |
| image.save(tmp_file.name) | |
| image_path = tmp_file.name | |
| delete_temp_file = True # Mark that we need to delete this temp file | |
| try: | |
| with torch.no_grad(): | |
| embed = model.encode(image=image_path) | |
| embed = embed.squeeze(0) | |
| finally: | |
| if delete_temp_file: | |
| # Remove the temporary file | |
| os.remove(image_path) | |
| return embed.cpu() | |
| # Function to encode text | |
| def encode_text(text): | |
| with torch.no_grad(): | |
| embed = model.encode(text=text) # Assuming encode returns [1, D] | |
| embed = embed.squeeze(0) # Remove the batch dimension if present | |
| return embed.cpu() | |
| # Function to index uploaded files (PDFs or images) | |
| def index_files(files, embeddings_state, metadata_state): | |
| print("Indexing files...") | |
| embeddings = [] | |
| metadata = [] | |
| for file in files: | |
| if file.name.lower().endswith('.pdf'): | |
| images = convert_from_path(file.name, thread_count=4) | |
| for idx, img in enumerate(images): | |
| img_path = f"{file.name}_page_{idx}.png" | |
| img.save(img_path) | |
| embed = encode_image(img_path) | |
| print(f"Embedding shape after encoding image: {embed.shape}") # Should be [768] | |
| embeddings.append(embed) | |
| metadata.append({"type": "image", "path": img_path, "info": f"Page {idx}"}) | |
| elif file.name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')): | |
| img_path = file.name | |
| embed = encode_image(img_path) | |
| print(f"Embedding shape after encoding image: {embed.shape}") # Should be [768] | |
| embeddings.append(embed) | |
| metadata.append({"type": "image", "path": img_path, "info": "Uploaded Image"}) | |
| else: | |
| raise gr.Error("Unsupported file type. Please upload PDFs or image files.") | |
| embeddings = torch.stack(embeddings).to(device) # Should result in shape [N, 768] | |
| print(f"Stacked embeddings shape: {embeddings.shape}") | |
| embeddings_state = embeddings | |
| metadata_state = metadata | |
| return f"Indexed {len(embeddings)} items.", embeddings_state, metadata_state | |
| def search(query_text, query_image, k, embeddings_state, metadata_state): | |
| embeddings = embeddings_state | |
| metadata = metadata_state | |
| if embeddings is None or embeddings.size(0) == 0: | |
| return "No embeddings indexed. Please upload and index files first.", [] | |
| query_emb = None | |
| if query_text and query_image: | |
| gr.warning("Please provide either a text query or an image query, not both. Using text query by default.") | |
| # text_emb = encode_text(query_text) # [D] | |
| # image_emb = encode_image(query_image) # [D] | |
| # query_emb = (text_emb + image_emb) / 2 # [D] | |
| # print("Combined text and image embeddings for query.") | |
| query_emb = encode_text(query_text) # [D] | |
| if query_text: | |
| query_emb = encode_text(query_text) # [D] | |
| print("Encoded text query.") | |
| elif query_image is not None : | |
| print(query_image) | |
| query_emb = encode_image(query_image) # [D] | |
| print("Encoded image query.") | |
| else: | |
| return "Please provide at least a text query or an image query.", [] | |
| # Ensure query_emb has shape [1, D] | |
| if query_emb.dim() == 1: | |
| query_emb = query_emb.unsqueeze(0) # [1, D] | |
| # Normalize embeddings for cosine similarity | |
| query_emb = F.normalize(query_emb.to(device), p=2, dim=1) # [1, D] | |
| indexed_emb = F.normalize(embeddings.to(device), p=2, dim=1) # [N, D] | |
| print(f"Query embedding shape: {query_emb.shape}") # Should be [1, 768] | |
| print(f"Indexed embeddings shape: {indexed_emb.shape}") # Should be [N, 768] | |
| # Compute cosine similarities | |
| similarities = torch.matmul(query_emb, indexed_emb.T).squeeze(0) # [N] | |
| print(f"Similarities shape: {similarities.shape}") | |
| # Get top-k results | |
| topk = torch.topk(similarities, k) | |
| topk_indices = topk.indices.cpu().numpy() | |
| topk_scores = topk.values.cpu().numpy() | |
| print(f"Top-{k} indices: {topk_indices}") | |
| print(f"Top-{k} scores: {topk_scores}") | |
| results = [] | |
| for idx, score in zip(topk_indices, topk_scores): | |
| item = metadata[idx] | |
| if item["type"] == "image": | |
| # Load image from path | |
| img = Image.open(item["path"]).convert("RGB") | |
| results.append((img, f"Score: {score:.4f} | {item['info']}")) | |
| else: | |
| # Handle text data if applicable | |
| results.append((item["data"], f"Score: {score:.4f} | {item['info']}")) | |
| return results | |
| # Gradio Interface | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Visualized-BGE: Multimodal Retrieval Demo π") | |
| gr.Markdown(""" | |
| Upload PDF or image files to index them. Then, perform searches using text, images, or both to retrieve the most relevant items. | |
| **Note:** Ensure that you have indexed the files before performing a search. | |
| """) | |
| # Initialize state variables | |
| embeddings_state = gr.State(None) | |
| metadata_state = gr.State(None) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("## 1οΈβ£ Upload and Index Files") | |
| file_input = gr.File(file_types=["pdf", "png", "jpg", "jpeg", "bmp", "gif"], file_count="multiple", label="Upload Files") | |
| index_button = gr.Button("π Index Files") | |
| index_status = gr.Textbox("No files indexed yet.", label="Indexing Status") | |
| with gr.Column(scale=3): | |
| gr.Markdown("## 2οΈβ£ Perform Search") | |
| with gr.Row(): | |
| query_text = gr.Textbox(placeholder="Enter your text query here...", label="Text Query") | |
| query_image = gr.Image(label="Image Query (Optional)") | |
| k = gr.Slider(minimum=1, maximum=20, step=1, label="Number of Results", value=5) | |
| search_button = gr.Button("π Search") | |
| output_gallery = gr.Gallery(label="Retrieved Results", show_label=True, columns=2) | |
| # Define button actions | |
| index_button.click( | |
| index_files, | |
| inputs=[file_input, embeddings_state, metadata_state], | |
| outputs=[index_status, embeddings_state, metadata_state] | |
| ) | |
| search_button.click( | |
| search, | |
| inputs=[query_text, query_image, k, embeddings_state, metadata_state], | |
| outputs=output_gallery | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ## About | |
| This demo uses the **Visualized-BGE** model for efficient multimodal retrieval tasks. Upload your documents or images, index them, and perform searches using text, images, or a combination of both. | |
| **References:** | |
| - [Visualized-BGE Paper](https://arxiv.org/abs/2406.04292) | |
| - [FlagEmbedding GitHub](https://github.com/FlagOpen/FlagEmbedding) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True, share=True) | |