| import streamlit as st |
| import os |
| import google.generativeai as genai |
| from dotenv import load_dotenv |
| import time |
| from typing import Any, List, Optional |
| import numpy as np |
| from sklearn.metrics.pairwise import cosine_similarity |
| import tensorflow as tf |
| from tensorflow.keras.models import Sequential |
| from tensorflow.keras.layers import Dense, Input |
| from tensorflow.keras.utils import to_categorical |
| from tensorflow.keras.optimizers import Adam |
|
|
| |
| load_dotenv() |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
| |
| if GOOGLE_API_KEY: |
| genai.configure(api_key=GOOGLE_API_KEY) |
| else: |
| st.error( |
| "Google AI Studio API key not found. Please add it to your .env file. " |
| "You can obtain an API key from https://makersuite.google.com/." |
| ) |
| st.stop() |
|
|
| st.title("Embeddings and Vector Search Demo") |
| st.subheader("Explore Embeddings and Vector Databases") |
|
|
| |
| with st.sidebar: |
| st.header("Embeddings and Vector Search") |
| st.markdown( |
| """ |
| This app demonstrates how embeddings and vector databases can be used for various tasks. |
| """ |
| ) |
| st.subheader("Key Concepts:") |
| st.markdown( |
| """ |
| - **Embeddings**: Numerical representations of text, capturing semantic meaning. |
| - **Vector Databases**: Databases optimized for storing and querying vectors (simulated here). |
| - **Retrieval Augmented Generation (RAG)**: Combining retrieval with LLM generation. |
| - **Cosine Similarity**: A measure of similarity between two vectors. |
| - **Neural Networks**: Using embeddings as input for classification. |
| """ |
| ) |
| st.subheader("Whitepaper Insights") |
| st.markdown( |
| """ |
| - Efficient similarity search using vector indexes (e.g., ANN). |
| - Handling large datasets and scalability considerations. |
| - Applications of embeddings: search, recommendation, classification, etc. |
| """ |
| ) |
|
|
| |
| def code_block(text: str, language: str = "text") -> None: |
| """Displays text as a formatted code block in Streamlit.""" |
| st.markdown(f"```{language}\n{text}\n```", unsafe_allow_html=True) |
|
|
| def display_response(response: Any) -> None: |
| """Displays the model's response.""" |
| if response and hasattr(response, "text"): |
| st.subheader("Generated Response:") |
| st.markdown(response.text) |
| else: |
| st.error("Failed to generate a response.") |
|
|
| def generate_embeddings(texts: List[str], model_name: str = "models/embedding-001") -> Optional[List[List[float]]]: |
| """Generates embeddings for a list of texts using a specified model. |
| Args: |
| texts: List of text strings. |
| model_name: Name of the embedding model. |
| Returns: |
| List of embeddings (list of floats) or None on error. |
| """ |
| try: |
| |
| embeddings = [] |
| for text in texts: |
| result = genai.embed_content( |
| model=model_name, |
| content=text, |
| task_type="retrieval_document" |
| ) |
| embeddings.append(result['embedding']) |
| return embeddings |
| except Exception as e: |
| st.error(f"Error generating embeddings with model '{model_name}': {e}") |
| return None |
|
|
| def generate_with_retry(prompt: str, model_name: str, generation_config: genai.types.GenerationConfig, max_retries: int = 3, delay: int = 5) -> Any: |
| """Generates content with retry logic and error handling.""" |
| for i in range(max_retries): |
| try: |
| model = genai.GenerativeModel(model_name) |
| response = model.generate_content(prompt, generation_config=generation_config) |
| return response |
| except Exception as e: |
| error_message = str(e) |
| st.warning(f"Error during generation (attempt {i + 1}/{max_retries}): {error_message}") |
| if "404" in error_message and "not found" in error_message: |
| st.error( |
| f"Model '{model_name}' is not available or not supported. Please select a different model." |
| ) |
| return None |
| elif i < max_retries - 1: |
| st.info(f"Retrying in {delay} seconds...") |
| time.sleep(delay) |
| else: |
| st.error(f"Failed to generate content after {max_retries} attempts. Please check your prompt and model.") |
| return None |
| return None |
|
|
| def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float: |
| """Calculates the cosine similarity between two embeddings.""" |
| return cosine_similarity(np.array(embedding1).reshape(1, -1), np.array(embedding2).reshape(1, -1))[0][0] |
|
|
| def create_and_train_model( |
| embeddings: List[List[float]], |
| labels: List[int], |
| num_classes: int, |
| epochs: int, |
| batch_size: int, |
| learning_rate: float, |
| optimizer_str: str |
| ) -> tf.keras.Model: |
| """Creates and trains a neural network for classification.""" |
| model = Sequential([ |
| Input(shape=(len(embeddings[0]),), |
| Dense(64, activation='relu'), |
| Dense(32, activation='relu'), |
| Dense(num_classes, activation='softmax') |
| ]) |
|
|
| if optimizer_str.lower() == 'adam': |
| optimizer = Adam(learning_rate=learning_rate) |
| elif optimizer_str.lower() == 'sgd': |
| optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate) |
| elif optimizer_str.lower() == 'rmsprop': |
| optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate) |
| else: |
| optimizer = Adam(learning_rate=learning_rate) |
|
|
| model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) |
| encoded_labels = to_categorical(labels, num_classes=num_classes) |
| model.fit(np.array(embeddings), encoded_labels, epochs=epochs, batch_size=batch_size, verbose=0) |
| return model |
| |
| st.header("RAG Question Answering") |
| rag_model_name = st.selectbox("Select model for RAG:", ["gemini-pro"], index=0) |
| rag_embedding_model = st.selectbox("Select embedding model for RAG:", ["models/embedding-001"], index=0) |
| rag_context = st.text_area( |
| "Enter your context documents:", |
| "Relevant information to answer the question. Separate documents with newlines.", |
| height=150, |
| ) |
| rag_question = st.text_area("Ask a question about the context:", "What is the main topic?", height=70) |
| rag_max_context_length = st.number_input("Maximum Context Length", min_value=100, max_value=2000, value=500, step=100) |
|
|
| if st.button("Answer with RAG"): |
| if not rag_context or not rag_question: |
| st.warning("Please provide both context and a question.") |
| else: |
| with st.spinner("Generating answer..."): |
| try: |
| |
| context_embeddings = generate_embeddings(rag_context.split('\n'), rag_embedding_model) |
| if not context_embeddings: |
| st.stop() |
|
|
| |
| question_embedding = generate_embeddings([rag_question], rag_embedding_model) |
| if not question_embedding: |
| st.stop() |
|
|
| |
| similarities = cosine_similarity(np.array(question_embedding).reshape(1, -1), np.array(context_embeddings))[0] |
|
|
| |
| most_relevant_index = np.argmax(similarities) |
| relevant_context = rag_context.split('\n')[most_relevant_index] |
| if len(relevant_context) > rag_max_context_length: |
| relevant_context = relevant_context[:rag_max_context_length] |
|
|
| |
| rag_prompt = f"Use the following context to answer the question: '{rag_question}'.\nContext: {relevant_context}" |
|
|
| |
| response = generate_with_retry(rag_prompt, rag_model_name, generation_config=genai.types.GenerationConfig()) |
| if response: |
| display_response(response) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| |
| st.header("Text Similarity") |
| similarity_embedding_model = st.selectbox("Select embedding model for similarity:", ["models/embedding-001"], index=0) |
| text1 = st.text_area("Enter text 1:", "This is the first sentence.", height=70) |
| text2 = st.text_area("Enter text 2:", "This is a similar sentence.", height=70) |
|
|
| if st.button("Calculate Similarity"): |
| if not text1 or not text2: |
| st.warning("Please provide both texts.") |
| else: |
| with st.spinner("Calculating similarity..."): |
| try: |
| embeddings = generate_embeddings([text1, text2], similarity_embedding_model) |
| if not embeddings: |
| st.stop() |
| similarity = calculate_similarity(embeddings[0], embeddings[1]) |
| st.subheader("Cosine Similarity:") |
| st.write(similarity) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| |
| st.header("Neural Classification with Embeddings") |
| classification_embedding_model = st.selectbox("Select embedding model for classification:", ["models/embedding-001"], index=0) |
| classification_data = st.text_area( |
| "Enter your training data (text, label pairs), separated by newlines. Example: text1,0\\ntext2,1", |
| "text1,0\ntext2,1\ntext3,0\ntext4,1", |
| height=150, |
| ) |
| classification_prompt = st.text_area("Enter text to classify:", "This is a test text.", height=70) |
| num_epochs = st.number_input("Number of Epochs", min_value=1, max_value=200, value=10, step=1) |
| batch_size = st.number_input("Batch Size", min_value=1, max_value=128, value=32, step=1) |
| learning_rate = st.number_input("Learning Rate", min_value=0.0001, max_value=0.1, value=0.0001, step=0.0001, format="%.4f") |
| optimizer_str = st.selectbox("Optimizer", ['adam', 'sgd', 'rmsprop'], index=0) |
|
|
| def process_classification_data(data: str) -> Optional[tuple[List[str], List[int]]]: |
| """Processes the classification data string into lists of texts and labels.""" |
| data_pairs = [line.split(',') for line in data.split('\n') if ',' in line] |
| if not data_pairs: |
| st.error("No valid data pairs found. Please ensure each line contains 'text,label'.") |
| return None |
| texts = [] |
| labels = [] |
| for i, pair in enumerate(data_pairs): |
| if len(pair) != 2: |
| st.error(f"Invalid data format in line {i + 1}: '{','.join(pair)}'. Expected 'text,label'.") |
| return None |
| text = pair[0].strip() |
| label_str = pair[1].strip() |
| try: |
| label = int(label_str) |
| texts.append(text) |
| labels.append(label) |
| except ValueError: |
| st.error(f"Invalid label value in line {i + 1}: '{label_str}'. Label must be an integer.") |
| return None |
| return texts, labels |
|
|
| if st.button("Classify"): |
| if not classification_data or not classification_prompt: |
| st.warning("Please provide training data and text to classify.") |
| else: |
| with st.spinner("Classifying..."): |
| try: |
| processed_data = process_classification_data(classification_data) |
| if not processed_data: |
| st.stop() |
| train_texts, train_labels = processed_data |
| num_classes = len(set(train_labels)) |
|
|
| train_embeddings = generate_embeddings(train_texts, classification_embedding_model) |
| if not train_embeddings: |
| st.stop() |
|
|
| model = create_and_train_model( |
| train_embeddings, train_labels, num_classes, num_epochs, batch_size, learning_rate, optimizer_str |
| ) |
|
|
| predict_embedding = generate_embeddings([classification_prompt], classification_embedding_model) |
| if not predict_embedding: |
| st.stop() |
|
|
| prediction = model.predict(np.array([predict_embedding]), verbose=0) |
| predicted_class = np.argmax(prediction[0]) |
| st.subheader("Predicted Class:") |
| st.write(predicted_class) |
| st.subheader("Prediction Probabilities:") |
| st.write(prediction) |
|
|
| except Exception as e: |
| st.error(f"An error occurred: {e}") |