Spaces:
Sleeping
Sleeping
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import clip | |
import h5py | |
import ml_collections | |
import numpy as np | |
import open_clip | |
import streamlit as st | |
import torch | |
from huggingface_hub import hf_hub_download | |
from app_lib.ckde import cKDE | |
from app_lib.utils import SUPPORTED_MODELS | |
from ibydmt.test import xSKIT | |
rng = np.random.default_rng() | |
def _get_open_clip_model(model_name, device): | |
backbone = model_name.split(":")[-1] | |
model, _, preprocess = open_clip.create_model_and_transforms( | |
SUPPORTED_MODELS[model_name], device=device | |
) | |
model.eval() | |
tokenizer = open_clip.get_tokenizer(backbone) | |
return model, preprocess, tokenizer | |
def _get_clip_model(model_name, device): | |
backbone = model_name.split(":")[-1] | |
model, preprocess = clip.load(backbone, device=device) | |
tokenizer = clip.tokenize | |
return model, preprocess, tokenizer | |
def _load_model(model_name, device): | |
if "open_clip" in model_name: | |
model, preprocess, tokenizer = _get_open_clip_model(model_name, device) | |
elif "clip" in model_name: | |
model, preprocess, tokenizer = _get_clip_model(model_name, device) | |
return model, preprocess, tokenizer | |
def _encode_concepts(tokenizer, model, concepts, device): | |
concepts_text = tokenizer(concepts).to(device) | |
concept_features = model.encode_text(concepts_text) | |
concept_features /= torch.linalg.norm(concept_features, dim=-1, keepdim=True) | |
return concept_features.cpu().numpy() | |
def _encode_image(model, preprocess, image, device): | |
image = preprocess(image) | |
image = image.unsqueeze(0) | |
image = image.to(device) | |
image_features = model.encode_image(image) | |
image_features /= image_features.norm(dim=-1, keepdim=True) | |
return image_features.cpu().numpy() | |
def _encode_class_name(tokenizer, model, class_name, device): | |
class_text = tokenizer([f"A photo of a {class_name}"]).to(device) | |
class_features = model.encode_text(class_text) | |
class_features /= torch.linalg.norm(class_features, dim=-1, keepdim=True) | |
return class_features.cpu().numpy() | |
def _load_dataset(dataset_name, model_name): | |
dataset_path = hf_hub_download( | |
repo_id="jacopoteneggi/IBYDMT", | |
filename=f"{dataset_name}_{model_name}_train.h5", | |
repo_type="dataset", | |
) | |
with h5py.File(dataset_path, "r") as dataset: | |
embedding = dataset["embedding"][:] | |
return embedding | |
def _sample_random_subset(concept_idx, concepts, cardinality): | |
sample_idx = list(set(range(len(concepts))) - {concept_idx}) | |
return rng.permutation(sample_idx)[:cardinality].tolist() | |
def _test(testing_config, z, concept_idx, concepts, cardinality, sampler, classifier): | |
def cond_p(z, cond_idx, m): | |
_, sample_h = sampler.sample(z, cond_idx, m=m) | |
return sample_h | |
def f(h): | |
output = h @ classifier.T | |
return output.squeeze() | |
rejected_hist, tau_hist, wealth_hist, subset_hist = [], [], [], [] | |
for _ in range(testing_config.r): | |
subset_idx = _sample_random_subset(concept_idx, concepts, cardinality) | |
subset = [concepts[idx] for idx in subset_idx] | |
tester = xSKIT(testing_config) | |
rejected, tau = tester.test( | |
z, | |
concept_idx, | |
subset_idx, | |
cond_p, | |
f, | |
interrupt_on="max_wealth", | |
max_wealth=3 * 1 / testing_config.significance_level, | |
) | |
wealth = tester.wealth._wealth | |
wealth = wealth + [wealth[-1]] * (testing_config.tau_max - len(wealth)) | |
rejected_hist.append(rejected) | |
tau_hist.append(tau) | |
wealth_hist.append(wealth) | |
subset_hist.append(subset) | |
return { | |
"concept": concepts[concept_idx], | |
"rejected": rejected_hist, | |
"tau": tau_hist, | |
"wealth": wealth_hist, | |
"subset": subset_hist, | |
} | |
def get_testing_config(**kwargs): | |
testing_config = st.session_state.testing_config = ml_collections.ConfigDict() | |
testing_config.significance_level = kwargs.get("significance_level", 0.05) | |
testing_config.wealth = kwargs.get("wealth", "ons") | |
testing_config.bet = kwargs.get("bet", "tanh") | |
testing_config.kernel = kwargs.get("kernel", "rbf") | |
testing_config.kernel_scale_method = kwargs.get("kernel_scale_method", "quantile") | |
testing_config.kernel_scale = kwargs.get("kernel_scale", 0.5) | |
testing_config.tau_max = kwargs.get("tau_max", 200) | |
testing_config.r = kwargs.get("r", 10) | |
return testing_config | |
def load_precomputed_results(image_name): | |
results = np.load( | |
os.path.join("assets", "results", f"{image_name.split('.')[0]}.npy"), | |
allow_pickle=True, | |
).item() | |
return results | |
def test( | |
testing_config, | |
image, | |
class_name, | |
concepts, | |
cardinality, | |
dataset_name, | |
model_name, | |
device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), | |
with_streamlit=True, | |
): | |
if with_streamlit: | |
with st.spinner("Loading model"): | |
model, preprocess, tokenizer = _load_model(model_name, device) | |
else: | |
model, preprocess, tokenizer = _load_model(model_name, device) | |
if with_streamlit: | |
with st.spinner("Encoding concepts"): | |
cbm = _encode_concepts(tokenizer, model, concepts, device) | |
else: | |
cbm = _encode_concepts(tokenizer, model, concepts, device) | |
if with_streamlit: | |
with st.spinner("Encoding image"): | |
h = _encode_image(model, preprocess, image, device) | |
else: | |
h = _encode_image(model, preprocess, image, device) | |
z = h @ cbm.T | |
z = z.squeeze() | |
if with_streamlit: | |
progress_bar = st.progress( | |
0, | |
text=( | |
"Testing concepts (can take up to a minute) [0 /" | |
f" {len(concepts)} completed]" | |
), | |
) | |
progress_bar.progress( | |
1 / (len(concepts) + 1), | |
text=( | |
"Testing concepts (can take up to a minute) [0 /" | |
f" {len(concepts)} completed]" | |
), | |
) | |
embedding = _load_dataset(dataset_name, model_name) | |
semantics = embedding @ cbm.T | |
sampler = cKDE(embedding, semantics) | |
classifier = _encode_class_name(tokenizer, model, class_name, device) | |
with ThreadPoolExecutor() as executor: | |
futures = [ | |
executor.submit( | |
_test, | |
testing_config, | |
z, | |
concept_idx, | |
concepts, | |
cardinality, | |
sampler, | |
classifier, | |
) | |
for concept_idx in range(len(concepts)) | |
] | |
results = [] | |
for idx, future in enumerate(as_completed(futures)): | |
results.append(future.result()) | |
if with_streamlit: | |
progress_bar.progress( | |
(idx + 2) / (len(concepts) + 1), | |
text=( | |
f"Testing concepts (can take up to a minute) [{idx + 1} /" | |
f" {len(concepts)} completed]" | |
), | |
) | |
rejected = np.empty((testing_config.r, len(concepts))) | |
tau = np.empty((testing_config.r, len(concepts))) | |
wealth = np.empty((testing_config.r, testing_config.tau_max, len(concepts))) | |
for _results in results: | |
concept_idx = concepts.index(_results["concept"]) | |
rejected[:, concept_idx] = np.array(_results["rejected"]) | |
tau[:, concept_idx] = np.array(_results["tau"]) | |
wealth[:, :, concept_idx] = np.array(_results["wealth"]) | |
tau /= testing_config.tau_max | |
results = { | |
"significance_level": testing_config.significance_level, | |
"concepts": concepts, | |
"rejected": rejected, | |
"tau": tau, | |
"wealth": wealth, | |
} | |
return results | |