Spaces:
Runtime error
Runtime error
import glob | |
import gradio as gr | |
import pandas as pd | |
import faiss | |
import clip | |
import torch | |
title = r""" | |
<h1 align="center" id="space-title"> π Search Similar Text/Image in the Dataset</h1> | |
""" | |
description = r""" | |
In this demo, we use subset of [danbooru22](https://huggingface.co/datasets/animelover/danbooru2022) or [DiffusionDB](https://huggingface.co/datasets/poloclub/diffusiondb) instead of [LAION](https://laion.ai/blog/laion-400-open-dataset/) instead of [LAION](https://laion.ai/blog/laion-400-open-dataset/) because LAION is currently not available. | |
<br> | |
This demo currently supports text search only. | |
<br> | |
The content will be updated to include image search once LAION is available. | |
The code is based on [clip-retrieval](https://github.com/rom1504/clip-retrieval) and [autofaiss](https://github.com/criteo/autofaiss) | |
""" | |
#In this demo, we use because LAION is currently not available. | |
# From local file | |
# INDEX_DIR = "dataset/diffusiondb/text_index_folder" | |
# IND = faiss.read_index(f"{INDEX_DIR}/text.index") | |
# TEXT_LIST = pd.concat( | |
# pd.read_parquet(file) for file in glob.glob(f"{INDEX_DIR}/metadata/*.parquet") | |
# )['caption'].tolist() | |
# From huggingface dataset | |
from huggingface_hub import hf_hub_download, snapshot_download | |
DATASET_NAME = { | |
"danbooru22": "booru22_000-300", | |
"DiffusionDB": "diffusiondb", | |
} | |
def load_faiss_index(dataset): | |
index_dir = "data/faiss_index" | |
dataset = DATASET_NAME[dataset] | |
hf_hub_download( | |
repo_id="Eun02/text_image_faiss_index", | |
subfolder=dataset, | |
filename="text.index", | |
repo_type="dataset", | |
local_dir=index_dir, | |
) | |
# Download text file | |
snapshot_download( | |
repo_id="Eun02/text_image_faiss_index", | |
allow_patterns=f"{dataset}/*.parquet", | |
repo_type="dataset", | |
local_dir=index_dir, | |
) | |
index = faiss.read_index(f"{index_dir}/{dataset}/text.index") | |
text_list = pd.concat( | |
pd.read_parquet(file) for file in sorted(glob.glob(f"{index_dir}/{dataset}/metadata/*.parquet")) | |
)['caption'].tolist() | |
return index, text_list | |
# Load CLIP model | |
device = "cpu" | |
CLIP_MODEL, _ = clip.load("ViT-B/32", device=device) | |
def get_emb(text, device="cpu"): | |
text_tokens = clip.tokenize([text], truncate=True) | |
text_features = CLIP_MODEL.encode_text(text_tokens.to(device)) | |
text_features /= text_features.norm(dim=-1, keepdim=True) | |
text_embeddings = text_features.cpu().numpy().astype('float32') | |
return text_embeddings | |
def search_text(dataset, top_k, show_score, query_text, device): | |
ind, text_list = load_faiss_index(dataset) | |
if query_text is None or query_text == "": | |
raise gr.Error("Query text is missing") | |
text_embeddings = get_emb(query_text, device) | |
scores, retrieved_texts = ind.search(text_embeddings, top_k) | |
scores, retrieved_texts = scores[0], retrieved_texts[0] | |
result_str = "" | |
for score, ind in zip(scores, retrieved_texts): | |
item_str = text_list[ind].strip() | |
if item_str == "": | |
continue | |
result_str += f"{item_str}" | |
if show_score: | |
result_str += f", {score:0.2f}" | |
result_str += "\n" | |
# file_name = query_text.replace(" ", "_") | |
# if show_score: | |
# file_name += "_score" | |
file_name = "output" | |
output_path = f"./{file_name}.txt" | |
with open(output_path, "w") as f: | |
f.writelines(result_str) | |
return result_str, output_path | |
with gr.Blocks() as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
with gr.Row(): | |
dataset = gr.Dropdown(label="dataset", choices=["danbooru22", "DiffusionDB"], value="DiffusionDB") | |
top_k = gr.Slider(label="top k", minimum=1, maximum=20, value=8) | |
show_score = gr.Checkbox(label="Show score", value=True) | |
query_text = gr.Textbox(label="query text") | |
btn = gr.Button() | |
with gr.Row(): | |
result_text = gr.Textbox(label="retrieved text", interactive=False) | |
result_file = gr.File(label="output file") | |
btn.click( | |
fn=search_text, | |
inputs=[dataset, top_k, show_score, query_text], | |
outputs=[result_text, result_file], | |
) | |
demo.launch() | |