webplip / image2image.py
vinid's picture
img2img retrieval (#4)
9e74540
raw history blame
No virus
5.94 kB
import streamlit as st
import pandas as pd
from plip_support import embed_text
import numpy as np
from PIL import Image
import requests
import tokenizers
import os
from io import BytesIO
import pickle
import base64
import torch
from transformers import (
VisionTextDualEncoderModel,
AutoFeatureExtractor,
AutoTokenizer,
CLIPModel,
AutoProcessor
)
import streamlit.components.v1 as components
from st_clickable_images import clickable_images #pip install st-clickable-images
def embed_images(model, images, processor):
inputs = processor(images=images)
pixel_values = torch.tensor(np.array(inputs["pixel_values"]))
with torch.no_grad():
embeddings = model.get_image_features(pixel_values=pixel_values)
return embeddings
@st.cache
def load_embeddings(embeddings_path):
print("loading embeddings")
return np.load(embeddings_path)
@st.cache(
hash_funcs={
torch.nn.parameter.Parameter: lambda _: None,
tokenizers.Tokenizer: lambda _: None,
tokenizers.AddedToken: lambda _: None
}
)
def load_path_clip():
model = CLIPModel.from_pretrained("vinid/plip")
processor = AutoProcessor.from_pretrained("vinid/plip")
return model, processor
def init():
with open('data/twitter.asset', 'rb') as f:
data = pickle.load(f)
meta = data['meta'].reset_index(drop=True)
image_embedding = data['embedding']
print(meta.shape, image_embedding.shape)
validation_subset_index = meta['source'].values == 'Val_Tweets'
return meta, image_embedding, validation_subset_index
def app():
st.title('Image to Image Retrieval')
st.markdown('#### A pathology image search engine that correlate images with images.')
meta, image_embedding, validation_subset_index = init()
model, processor = load_path_clip()
st.markdown('Click following examples:')
example_path = 'data/example_images'
list_of_examples = [os.path.join(example_path, v) for v in os.listdir(example_path)]
example_imgs = []
for file in list_of_examples:
with open(file, "rb") as image:
encoded = base64.b64encode(image.read()).decode()
example_imgs.append(f"data:image/jpeg;base64,{encoded}")
clicked = clickable_images(
example_imgs,
titles=[f"Image #{str(i)}" for i in range(len(example_imgs))],
div_style={"display": "flex", "justify-content": "center", "flex-wrap": "wrap"},
img_style={"margin": "5px", "height": "70px"},
)
isExampleClicked = False
if clicked > -1:
image = Image.open(list_of_examples[clicked])
isExampleClicked = True
data_options = ["All twitter data (2006-03-21 β€” 2023-01-15)",
"Twitter validation data (2022-11-16 β€” 2023-01-15)"]
st.radio(
"Or choose dataset for image retrieval πŸ‘‰",
key="datapool",
options=data_options,
)
col1, col2 = st.columns(2)
with col1:
query = st.file_uploader("Choose a file to upload")
proceed = False
if query:
image = Image.open(query)
proceed = True
elif isExampleClicked:
proceed = True
if proceed:
with col2:
st.image(image, caption='Your upload')
single_image = embed_images(model, [image], processor)[0].detach().cpu().numpy()
single_image = single_image/np.linalg.norm(single_image)
# Sort IDs by cosine-similarity from high to low
similarity_scores = single_image.dot(image_embedding.T)
topn = 5
if st.session_state.datapool == data_options[0]:
#Use all twitter data
id_sorted = np.argsort(similarity_scores)[::-1]
best_ids = id_sorted[:topn]
best_scores = similarity_scores[best_ids]
target_weblinks = meta["weblink"].values[best_ids]
else:
#Use validation twitter data
similarity_scores = similarity_scores[validation_subset_index]
# Sort IDs by cosine-similarity from high to low
id_sorted = np.argsort(similarity_scores)[::-1]
best_ids = id_sorted[:topn]
best_scores = similarity_scores[best_ids]
target_weblinks = meta["weblink"].values[validation_subset_index][best_ids]
#TODO: Avoid duplicated ID
topk_options = ['1st', '2nd', '3rd', '4th', '5th']
st.radio(
"Choose the most similar πŸ‘‰",
key="top_k",
options=topk_options,
horizontal=True
)
topn_txt = st.session_state.top_k
topn_value = int(st.session_state.top_k[0])-1
st.caption(f'The {topn_txt} relevant image (similarity = {best_scores[topn_value]:.4f})')
components.html('''
<blockquote class="twitter-tweet">
<a href="%s"></a>
</blockquote>
<script async src="https://platform.twitter.com/widgets.js" charset="utf-8">
</script>
''' % target_weblinks[topn_value],
height=800)
st.markdown('Disclaimer')
st.caption('Please be advised that this function has been developed in compliance with the Twitter policy of data usage and sharing. It is important to note that the results obtained from this function are not intended to constitute medical advice or replace consultation with a qualified medical professional. The use of this function is solely at your own risk and should be consistent with applicable laws, regulations, and ethical considerations. We do not warrant or guarantee the accuracy, completeness, suitability, or usefulness of this function for any particular purpose, and we hereby disclaim any liability arising from any reliance placed on this function or any results obtained from its use. If you wish to review the original Twitter post, you should access the source page directly on Twitter.')