from openai import OpenAI from sentence_transformers import SentenceTransformer from time import perf_counter as timer from huggingface_hub import login from datasets import Dataset, load_dataset import streamlit as st import pandas as pd import numpy as np import torch as t import os import random import streamlit.components.v1 as components # Define the React component def search_results_component(query, results, key=None): return components.declare_component( "search_results", path="frontend/build" # Path to your built React component )(query=query, results=results, key=key) # Cache the model loading @st.cache_resource def load_sentence_transformer(): """Cache the SentenceTransformer model loading to avoid reloading on every rerun""" return SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device="cpu") # Cache the database loading @st.cache_data def load_data(database_file): return pd.read_parquet(database_file) def load_credentials(): credentials = {} for i in range(1, 51): username = os.environ.get(f"login_{i}") password = os.environ.get(f"password_{i}") if username and password: credentials[username] = password return credentials def authenticate(username, password, credentials): return credentials.get(username) == password def debug_check_before_save(data_dict): # Check lengths lengths = {k: len(v) for k, v in data_dict.items()} print("\nDebug Check Results:") print(f"All column lengths: {lengths}") # Check last few entries print("\nLast 4 entries of each column:") for key, values in data_dict.items(): print(f"\n{key}:") print(values[-4:]) return len(set(lengths.values())) == 1 # Returns True if all lengths match def save_reactions_to_dataset(user_type, username, query, results): data = { "user_type": [], "username": [], "query": [], "retrieved_text": [], "model_type": [], "reaction": [] } # Add results for result in results: data["user_type"].append(user_type) data["username"].append(username) data["query"].append(query) data["retrieved_text"].append(result["text"]) data["model_type"].append(result["model"]) data["reaction"].append(result["reaction"]) try: # Load existing dataset dataset = load_dataset("HumbleBeeAI/al-ghazali-rag-retrieval-evaluation", split="train") existing_data = dataset.to_dict() # Handle missing columns for key in data: if key not in existing_data: # Initialize missing columns with exactly existing_length entries existing_data[key] = ["" if key in ["username", "model_type"] else None] * len(next(iter(existing_data.values()))) # Now extend with new data for key in data: existing_data[key].extend(data[key]) except Exception as e: print(f"Error occurred: {str(e)}") # If loading fails, start fresh with just the new data existing_data = data if debug_check_before_save(existing_data): updated_dataset = Dataset.from_dict(existing_data) updated_dataset.push_to_hub("HumbleBeeAI/al-ghazali-rag-retrieval-evaluation") else: raise ValueError("Length mismatch detected in final check") def update_reaction(model_type, idx): st.session_state.reactions[f"reaction_{model_type}_{idx}"] = st.session_state[f"reaction_{model_type}_{idx}"] def cosine_similarity(embedding_0, embedding_1): dot_product = sum(a * b for a, b in zip(embedding_0, embedding_1)) norm_0 = sum(a * a for a in embedding_0) ** 0.5 norm_1 = sum(b * b for b in embedding_1) ** 0.5 return dot_product / (norm_0 * norm_1) def generate_embedding(model, text, model_type="all-mpnet-base-v2"): if model_type == "all-mpnet-base-v2": chunk_embedding = model.encode( text, convert_to_tensor = True ) return np.array(t.Tensor.cpu(chunk_embedding)) elif model_type == "text-embedding-3-small": response = model.embeddings.create( input=text, model=model_type ) return response.data[0].embedding def search_query(model, query, df, model_type, n=3): if model_type == "all-mpnet-base-v2": embedding = generate_embedding(model, query, model_type=model_type) df['similarities'] = df.all_mpnet_embedding.apply(lambda x: cosine_similarity(x, embedding)) elif model_type == "text-embedding-3-small": embedding = generate_embedding(model, query, model_type=model_type) df['similarities'] = df.openai_embedding.apply(lambda x: cosine_similarity(x, embedding)) res = df.sort_values('similarities', ascending=False).head(n) return res def clear_search_state(): """Clear search-related session state variables""" st.session_state.search_performed = False st.session_state.top_results_mpnet = [] st.session_state.top_results_openai = [] st.session_state.reactions = {} st.session_state.results_saved = False def main(): st.title("EnlightenQalb (Alchemy of Happiness)") # Initialize session state variables if 'authenticated' not in st.session_state: st.session_state.authenticated = False st.session_state.username = None st.session_state.search_performed = False st.session_state.top_results_mpnet = [] st.session_state.top_results_openai = [] st.session_state.reactions = {} st.session_state.results_saved = False st.session_state.current_query = "" # Load the model at startup (will be cached) embedding_model = load_sentence_transformer() # Load credentials credentials = load_credentials() # Authentication handling if not st.session_state.authenticated: st.sidebar.title("Login") username = st.sidebar.text_input("Username") password = st.sidebar.text_input("Password", type="password") if st.sidebar.button("Login"): if authenticate(username, password, credentials): st.session_state.authenticated = True st.session_state.username = username st.sidebar.success("Logged in successfully!") else: st.sidebar.error("Invalid username or password") if not st.session_state.authenticated: st.warning("Please login to access the application.") return # Login to Hugging Face huggingface_token = os.environ.get("al_ghazali_rag_retrieval_evaluation") if huggingface_token: login(token=huggingface_token) else: st.error("Hugging Face API token not found in environment variables.") # Initialize OpenAI client client = OpenAI() # Load database database_file = '[all_embedded] The Alchemy of Happiness (Ghazzālī, Claud Field) (Z-Library).parquet' try: df = load_data(database_file) st.success("Database loaded successfully!") user_type = st.radio( "Select your user type:", ["Layman", "Enthusiast", "Ustaz (Expert)"], horizontal=True ) query = st.text_area("Enter your query:") # Clear search state if query changes if query != st.session_state.current_query: clear_search_state() st.session_state.current_query = query if st.button("Search") and query: clear_search_state() # Clear previous search results # Perform searches with both models start_time = timer() # MPNet search res_mpnet = search_query(embedding_model, query, df, "all-mpnet-base-v2", n=1) st.session_state.top_results_mpnet = res_mpnet.index.tolist() # OpenAI search res_openai = search_query(client, query, df, "text-embedding-3-small", n=1) st.session_state.top_results_openai = res_openai.index.tolist() end_time = timer() st.write(f"Time taken to compute scores: {end_time - start_time:.5f} seconds") st.session_state.search_performed = True if st.session_state.search_performed and not st.session_state.results_saved: st.subheader("Compare Results") # Prepare results in the required format results = [ { "model": "all-mpnet-base-v2", "text": df.iloc[int(st.session_state.top_results_mpnet[0])]["ext"] }, { "model": "text-embedding-3-small", "text": df.iloc[int(st.session_state.top_results_openai[0])]["ext"] } ] # Use Streamlit's columns for side-by-side display col1, col2 = st.columns(2) # Randomly decide which result goes in which column if random.random() < 0.5: results = results[::-1] with col1: st.markdown("### Result A") st.write(results[0]["text"]) reaction_a = st.radio( "Rate Result A:", ["👎", "🤷", "👍"], key=f"reaction_a", horizontal=True ) with col2: st.markdown("### Result B") st.write(results[1]["text"]) reaction_b = st.radio( "Rate Result B:", ["👎", "🤷", "👍"], key=f"reaction_b", horizontal=True ) if st.button("Save Reactions"): results_to_save = [ { "model": results[0]["model"], "text": results[0]["text"], "reaction": reaction_a }, { "model": results[1]["model"], "text": results[1]["text"], "reaction": reaction_b } ] save_reactions_to_dataset( user_type, st.session_state.username, query, results_to_save ) st.success("Reactions saved successfully!") clear_search_state() except Exception as e: st.error(f"Failed to load database: {str(e)}") if __name__ == "__main__": main()