import streamlit as st import open_clip import torch import requests from PIL import Image from io import BytesIO import time import json import numpy as np import cv2 import chromadb from transformers import pipeline import torch.nn as nn import matplotlib.pyplot as plt # Load CLIP model and tokenizer @st.cache_resource def load_clip_model(): model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP') tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) return model, preprocess_val, tokenizer, device clip_model, preprocess_val, tokenizer, device = load_clip_model() # Load Clothing Segmentation model @st.cache_resource def load_segmentation_model(): return pipeline(model="mattmdjaga/segformer_b2_clothes") segmenter = load_segmentation_model() # Load ChromaDB @st.cache_resource def load_chromadb(): client = chromadb.PersistentClient(path="./clothesDB") collection = client.get_collection(name="clothes_items_ver3") return collection collection = load_chromadb() # Helper functions def load_image_from_url(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert('RGB') return img except (requests.RequestException, Image.UnidentifiedImageError) as e: if attempt < max_retries - 1: time.sleep(1) else: return None def get_image_embedding(image): image_tensor = preprocess_val(image).unsqueeze(0).to(device) with torch.no_grad(): image_features = clip_model.encode_image(image_tensor) image_features /= image_features.norm(dim=-1, keepdim=True) return image_features.cpu().numpy() def get_text_embedding(text): text_tokens = tokenizer([text]).to(device) with torch.no_grad(): text_features = clip_model.encode_text(text_tokens) text_features /= text_features.norm(dim=-1, keepdim=True) return text_features.cpu().numpy() def find_similar_images(query_embedding, collection, top_k=5): database_embeddings = np.array(collection.get(include=['embeddings'])['embeddings']) similarities = np.dot(database_embeddings, query_embedding.T).squeeze() top_indices = np.argsort(similarities)[::-1][:top_k] all_data = collection.get(include=['metadatas'])['metadatas'] results = [ {'info': all_data[idx], 'similarity': similarities[idx]} for idx in top_indices ] return results def segment_clothing(img, clothes=["Hat", "Upper-clothes", "Skirt", "Pants", "Dress", "Belt", "Left-shoe", "Right-shoe", "Scarf"]): segments = segmenter(img) mask_list = [] for s in segments: if s['label'] in clothes: mask_list.append(s['mask']) if mask_list: final_mask = np.array(mask_list[0]) for mask in mask_list[1:]: current_mask = np.array(mask) final_mask = final_mask + current_mask final_mask = Image.fromarray(final_mask.astype('uint8') * 255) img = img.convert("RGBA") img.putalpha(final_mask) return img, segments # Streamlit app st.title("Advanced Fashion Search App") # Initialize session state if 'step' not in st.session_state: st.session_state.step = 'input' if 'query_image_url' not in st.session_state: st.session_state.query_image_url = '' if 'segmentations' not in st.session_state: st.session_state.segmentations = [] if 'selected_category' not in st.session_state: st.session_state.selected_category = None # Step-by-step processing if st.session_state.step == 'input': st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url) if st.button("Segment Clothing"): if st.session_state.query_image_url: query_image = load_image_from_url(st.session_state.query_image_url) if query_image is not None: st.session_state.query_image = query_image segmented_image, st.session_state.segmentations = segment_clothing(query_image) st.session_state.segmented_image = segmented_image if st.session_state.segmentations: st.session_state.step = 'select_category' else: st.warning("No clothing items segmented in the image.") else: st.error("Failed to load the image. Please try another URL.") else: st.warning("Please enter an image URL.") elif st.session_state.step == 'select_category': col1, col2 = st.columns(2) with col1: st.image(st.session_state.query_image, caption="Original Image", use_column_width=True) with col2: st.image(st.session_state.segmented_image, caption="Segmented Image", use_column_width=True) st.subheader("Segmented Clothing Items:") options = list(set(s['label'] for s in st.session_state.segmentations)) selected_option = st.selectbox("Select a category to search:", options) if st.button("Search Similar Items"): st.session_state.selected_category = selected_option st.session_state.step = 'show_results' elif st.session_state.step == 'show_results': st.image(st.session_state.query_image, caption="Query Image", use_column_width=True) st.image(st.session_state.segmented_image, caption="Segmented Image", use_column_width=True) selected_segment = next(s for s in st.session_state.segmentations if s['label'] == st.session_state.selected_category) mask = np.array(selected_segment['mask']) masked_image = Image.fromarray((np.array(st.session_state.query_image) * mask[:,:,None]).astype('uint8')) st.image(masked_image, caption=f"Selected Category: {st.session_state.selected_category}", use_column_width=True) query_embedding = get_image_embedding(masked_image) similar_images = find_similar_images(query_embedding, collection) st.subheader("Similar Items:") for img in similar_images: col1, col2 = st.columns(2) with col1: st.image(img['info']['image_url'], use_column_width=True) with col2: st.write(f"Name: {img['info']['name']}") st.write(f"Brand: {img['info']['brand']}") category = img['info'].get('category') if category: st.write(f"Category: {category}") st.write(f"Price: {img['info']['price']}") st.write(f"Discount: {img['info']['discount']}%") st.write(f"Similarity: {img['similarity']:.2f}") if st.button("Start New Search"): st.session_state.step = 'input' st.session_state.query_image_url = '' st.session_state.segmentations = [] st.session_state.selected_category = None # Text search (optional, you can keep or remove this part) st.sidebar.title("Text Search") query_text = st.sidebar.text_input("Enter search text:") if st.sidebar.button("Search by Text"): if query_text: text_embedding = get_text_embedding(query_text) similar_images = find_similar_images(text_embedding, collection) st.sidebar.subheader("Similar Items:") for img in similar_images: st.sidebar.image(img['info']['image_url'], use_column_width=True) st.sidebar.write(f"Name: {img['info']['name']}") st.sidebar.write(f"Brand: {img['info']['brand']}") category = img['info'].get('category') if category: st.sidebar.write(f"Category: {category}") st.sidebar.write(f"Price: {img['info']['price']}") st.sidebar.write(f"Discount: {img['info']['discount']}%") st.sidebar.write(f"Similarity: {img['similarity']:.2f}") else: st.sidebar.warning("Please enter a search text.") # Display ChromaDB vacuum message st.sidebar.warning("If you've upgraded ChromaDB from a version below 0.6, you may benefit from vacuuming your database")