Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.decomposition import PCA | |
| import time | |
| import json | |
| from typing import List, Dict, Tuple, Optional | |
| from datetime import datetime | |
| import warnings | |
| from sentence_transformers import SentenceTransformer | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import io | |
| import base64 | |
| import os | |
| warnings.filterwarnings("ignore") | |
| # Configuration for Sentence Transformers | |
| EMBEDDING_CONFIG = { | |
| "model": "google/embeddinggemma-300m", | |
| "similarity_threshold": 0.70, | |
| "high_similarity_threshold": 0.85, | |
| "normalize_embeddings": True, | |
| } | |
| class BlogTitleAnalyzer: | |
| def __init__(self, config=EMBEDDING_CONFIG): | |
| self.config = config | |
| self.model_name = config["model"] | |
| self.normalize = config.get("normalize_embeddings", True) | |
| self.existing_titles = [] | |
| self.existing_metadata = [] | |
| self.existing_embeddings = None | |
| # Initialize SentenceTransformer model with authentication for gated models | |
| if self.model_name.startswith("google/"): | |
| # Set Hugging Face token for gated Google models | |
| # hf_token = st.secrets.get( | |
| # "HUGGINGFACE_TOKEN", os.getenv("HUGGINGFACE_TOKEN") | |
| # ) | |
| # if hf_token: | |
| self.model = SentenceTransformer( | |
| self.model_name, use_auth_token=hf_token | |
| ) | |
| # else: | |
| # raise ValueError( | |
| # "Hugging Face token required for gated model. Please add it to Streamlit secrets." | |
| # ) | |
| else: | |
| self.model = SentenceTransformer(self.model_name) | |
| # ---- Embedding helpers ---- | |
| def _embed(self, texts): | |
| # Accept str or list[str]; always return list[list[float]] | |
| if isinstance(texts, str): | |
| inputs = [texts] | |
| else: | |
| inputs = list(texts) | |
| embeddings = self.model.encode(inputs, convert_to_numpy=True) | |
| if self.normalize: | |
| embeddings = self._l2_normalize_rows(embeddings) | |
| return embeddings.tolist() | |
| return embeddings.tolist() | |
| def _l2_normalize_rows(arr: np.ndarray) -> np.ndarray: | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) | |
| norms[norms == 0] = 1.0 | |
| return arr / norms | |
| # ---- Public API ---- | |
| def generate_embedding(self, title: str) -> np.ndarray: | |
| vec = np.array(self._embed(title)[0], dtype=np.float32) | |
| return vec | |
| def generate_embeddings_batch(self, titles) -> np.ndarray: | |
| vecs = np.array(self._embed(titles), dtype=np.float32) | |
| return vecs | |
| def load_existing_titles(self, titles, metadata=None): | |
| self.existing_titles = list(titles) | |
| self.existing_metadata = ( | |
| list(metadata) if metadata is not None else [{} for _ in titles] | |
| ) | |
| self.existing_embeddings = self.generate_embeddings_batch(self.existing_titles) | |
| def check_similarity_against_existing(self, new_title: str, top_k: int = 5): | |
| if self.existing_embeddings is None or len(self.existing_titles) == 0: | |
| return [] | |
| new_vec = self.generate_embedding(new_title).reshape(1, -1) | |
| sims = cosine_similarity(new_vec, self.existing_embeddings)[0] | |
| idx_sorted = np.argsort(-sims)[:top_k] | |
| results = [] | |
| for idx in idx_sorted: | |
| results.append( | |
| { | |
| "title": self.existing_titles[idx], | |
| "metadata": self.existing_metadata[idx], | |
| "similarity": float(sims[idx]), | |
| } | |
| ) | |
| return results | |
| def batch_check_similarity_against_existing(self, new_titles, top_k: int = 5): | |
| if self.existing_embeddings is None or len(self.existing_titles) == 0: | |
| return {t: [] for t in new_titles} | |
| new_vecs = self.generate_embeddings_batch(new_titles) | |
| sims = cosine_similarity(new_vecs, self.existing_embeddings) # [M, N] | |
| results = {} | |
| for i, t in enumerate(new_titles): | |
| row = sims[i] | |
| idx_sorted = np.argsort(-row)[:top_k] | |
| hits = [] | |
| for idx in idx_sorted: | |
| hits.append( | |
| { | |
| "title": self.existing_titles[idx], | |
| "metadata": self.existing_metadata[idx], | |
| "similarity": float(row[idx]), | |
| } | |
| ) | |
| results[t] = hits | |
| return results | |
| def simple_deduplication(analyzer, new_titles, threshold=0.7): | |
| """ | |
| Simple deduplication of new titles by comparing pairwise similarities. | |
| Removes titles that are overly similar (> threshold) based on rules. | |
| """ | |
| embeddings = analyzer.generate_embeddings_batch(new_titles) | |
| similarity_matrix = cosine_similarity(embeddings) | |
| n_titles = len(new_titles) | |
| to_remove = set() | |
| decisions = [] | |
| for i in range(n_titles): | |
| for j in range(i + 1, n_titles): | |
| if similarity_matrix[i][j] > threshold: | |
| title_A = new_titles[i] | |
| title_B = new_titles[j] | |
| if i in to_remove or j in to_remove: | |
| continue | |
| # Check against existing titles | |
| sims_A = analyzer.check_similarity_against_existing(title_A) | |
| sims_B = analyzer.check_similarity_against_existing(title_B) | |
| max_sim_A = max([s["similarity"] for s in sims_A], default=0.0) | |
| max_sim_B = max([s["similarity"] for s in sims_B], default=0.0) | |
| if max_sim_A > max_sim_B: | |
| to_remove.add(i) | |
| decisions.append( | |
| { | |
| "remove": title_A, | |
| "keep": title_B, | |
| "reason": "Title A is more similar to existing content", | |
| "pairwise_similarity": float(similarity_matrix[i][j]), | |
| } | |
| ) | |
| else: | |
| to_remove.add(j) | |
| decisions.append( | |
| { | |
| "remove": title_B, | |
| "keep": title_A, | |
| "reason": "Title B is more similar to existing content", | |
| "pairwise_similarity": float(similarity_matrix[i][j]), | |
| } | |
| ) | |
| # Final report | |
| report_rows = [] | |
| for d in decisions: | |
| report_rows.append( | |
| { | |
| "Action": "REMOVE", | |
| "Title": d["remove"], | |
| "Reason": d["reason"], | |
| "Pairwise_Similarity": round(d["pairwise_similarity"], 3), | |
| "Keep_Instead": d["keep"], | |
| } | |
| ) | |
| for i, t in enumerate(new_titles): | |
| if i not in to_remove: | |
| report_rows.append( | |
| { | |
| "Action": "KEEP", | |
| "Title": t, | |
| "Reason": "No high similarity conflicts", | |
| "Pairwise_Similarity": "N/A", | |
| "Keep_Instead": "N/A", | |
| } | |
| ) | |
| return pd.DataFrame(report_rows) | |
| def create_new_vs_existing_table( | |
| analyzer: BlogTitleAnalyzer, new_titles: List[str] | |
| ) -> pd.DataFrame: | |
| """Create a comprehensive table showing new titles vs existing titles comparisons""" | |
| table_data = [] | |
| for new_title in new_titles: | |
| # Generate embedding for new title | |
| new_embedding = analyzer.generate_embedding(new_title) | |
| # Calculate similarities against all existing titles | |
| similarities = cosine_similarity([new_embedding], analyzer.existing_embeddings)[ | |
| 0 | |
| ] | |
| # Add comparison with every existing title | |
| for idx, existing_title in enumerate(analyzer.existing_titles): | |
| table_data.append( | |
| { | |
| "New Title": new_title, | |
| "Existing Title": existing_title, | |
| "Similarity Score": round(similarities[idx], 3), | |
| } | |
| ) | |
| df = pd.DataFrame(table_data) | |
| return df | |
| def create_new_vs_new_table( | |
| analyzer: BlogTitleAnalyzer, new_titles: List[str] | |
| ) -> pd.DataFrame: | |
| """Create a table showing comparisons between new titles themselves""" | |
| table_data = [] | |
| # Generate embeddings for all new titles | |
| new_embeddings = analyzer.generate_embeddings_batch(new_titles) | |
| # Compare each new title with every other new title | |
| for i, title1 in enumerate(new_titles): | |
| for j, title2 in enumerate(new_titles): | |
| if i != j: # Don't compare a title with itself | |
| similarity = cosine_similarity( | |
| [new_embeddings[i]], [new_embeddings[j]] | |
| )[0][0] | |
| table_data.append( | |
| { | |
| "Title 1": title1, | |
| "Title 2": title2, | |
| "Similarity Score": round(similarity, 3), | |
| } | |
| ) | |
| df = pd.DataFrame(table_data) | |
| return df | |
| def run_title_similarity_analysis(existing_titles: List[str], new_titles: List[str]): | |
| """Run comprehensive title similarity analysis""" | |
| # Initialize analyzer | |
| analyzer = BlogTitleAnalyzer() | |
| # Load existing titles | |
| analyzer.load_existing_titles(existing_titles) | |
| # Create comparison tables | |
| new_vs_existing_table = create_new_vs_existing_table(analyzer, new_titles) | |
| new_vs_new_table = create_new_vs_new_table(analyzer, new_titles) | |
| dedup_report = simple_deduplication(analyzer, new_titles) | |
| return analyzer, new_vs_existing_table, new_vs_new_table, dedup_report | |
| # Streamlit UI | |
| st.set_page_config( | |
| page_title="Blog Title Checker", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| st.title("π Blog Title Similarity Checker") | |
| st.markdown( | |
| "Analyze and deduplicate blog titles using AI-powered similarity detection." | |
| ) | |
| # Sidebar with instructions | |
| st.sidebar.header("Instructions") | |
| st.sidebar.markdown( | |
| """ | |
| 1. **Existing Titles**: Enter your current blog titles (one per line) | |
| 2. **New Titles**: Enter new title ideas to check (one per line) | |
| 3. **Analysis**: Click "Analyze Titles" to run the similarity check | |
| 4. **Results**: View deduplication recommendations and download detailed reports | |
| **Note**: The app uses Sentence Transformers for embedding and compares titles using cosine similarity. | |
| """ | |
| ) | |
| # Create two columns for input | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Existing Blog Titles") | |
| st.markdown("Enter your current blog titles (one per line):") | |
| existing_titles_input = st.text_area( | |
| "Existing Titles", | |
| height=300, | |
| placeholder="The True Cost of Blown-In Insulation in 2025: A Detailed Breakdown\nCalculating the ROI of Your Attic Insulation Upgrade: A Step-by-Step Guide\nUnlocking Savings: Are There Government Rebates or Tax Credits for Blown-In Insulation?", | |
| ) | |
| with col2: | |
| st.subheader("New Titles to Check") | |
| st.markdown("Enter new title ideas to analyze (one per line):") | |
| new_titles_input = st.text_area( | |
| "New Titles", | |
| height=300, | |
| placeholder="Can Your Walls Be Insulated for Better Indoor Air Quality?\nHow Does Your Home's Insulation Impact Seasonal Allergies?\nIs Cold Air Leaking into Your Living Spaces, and What Can You Do?", | |
| ) | |
| # Process input | |
| existing_titles = [ | |
| title.strip() for title in existing_titles_input.split("\n") if title.strip() | |
| ] | |
| new_titles = [title.strip() for title in new_titles_input.split("\n") if title.strip()] | |
| # Analysis button | |
| if st.button("π Analyze Titles", type="primary", use_container_width=True): | |
| if not existing_titles: | |
| st.error("Please enter at least one existing blog title.") | |
| elif not new_titles: | |
| st.error("Please enter at least one new title to analyze.") | |
| else: | |
| with st.spinner("Analyzing titles... This may take a moment."): | |
| try: | |
| analyzer, new_vs_existing_df, new_vs_new_df, dedup_report = ( | |
| run_title_similarity_analysis(existing_titles, new_titles) | |
| ) | |
| # Store results in session state | |
| st.session_state["analysis_results"] = { | |
| "analyzer": analyzer, | |
| "new_vs_existing_df": new_vs_existing_df, | |
| "new_vs_new_df": new_vs_new_df, | |
| "dedup_report": dedup_report, | |
| } | |
| st.success("β Analysis completed successfully!") | |
| except Exception as e: | |
| st.error(f"An error occurred during analysis: {str(e)}") | |
| # Display results if available | |
| if "analysis_results" in st.session_state: | |
| results = st.session_state["analysis_results"] | |
| # Summary statistics | |
| st.header("π Summary Statistics") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Existing Titles", len(existing_titles)) | |
| with col2: | |
| st.metric("Total New Titles", len(new_titles)) | |
| with col3: | |
| duplicates_found = len( | |
| results["dedup_report"][results["dedup_report"]["Action"] == "REMOVE"] | |
| ) | |
| st.metric("Duplicates Found", duplicates_found) | |
| with col4: | |
| unique_titles = len( | |
| results["dedup_report"][results["dedup_report"]["Action"] == "KEEP"] | |
| ) | |
| st.metric("Unique Titles to Keep", unique_titles) | |
| # Deduplication Results | |
| st.header("π― Deduplication Recommendations") | |
| # Filter to show only duplicates | |
| duplicates_df = results["dedup_report"][ | |
| results["dedup_report"]["Action"] == "REMOVE" | |
| ] | |
| keep_df = results["dedup_report"][results["dedup_report"]["Action"] == "KEEP"] | |
| if not duplicates_df.empty: | |
| st.subheader("ποΈ Titles to Remove (Duplicates)") | |
| st.dataframe( | |
| duplicates_df[["Title", "Reason", "Pairwise_Similarity", "Keep_Instead"]], | |
| use_container_width=True, | |
| hide_index=True, | |
| ) | |
| st.subheader("β Titles to Keep") | |
| st.dataframe( | |
| keep_df[["Title", "Reason"]], use_container_width=True, hide_index=True | |
| ) | |
| # Download section | |
| st.header("πΎ Download Analysis Reports") | |
| download_col1, download_col2, download_col3 = st.columns(3) | |
| with download_col1: | |
| # New vs Existing comparison | |
| csv1 = results["new_vs_existing_df"].to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="π₯ New vs Existing Titles", | |
| data=csv1, | |
| file_name="new_vs_existing_titles.csv", | |
| mime="text/csv", | |
| use_container_width=True, | |
| ) | |
| with download_col2: | |
| # New vs New comparison | |
| csv2 = results["new_vs_new_df"].to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="π₯ New vs New Titles", | |
| data=csv2, | |
| file_name="new_vs_new_titles.csv", | |
| mime="text/csv", | |
| use_container_width=True, | |
| ) | |
| with download_col3: | |
| # Deduplication report | |
| csv3 = results["dedup_report"].to_csv(index=False).encode("utf-8") | |
| st.download_button( | |
| label="π₯ Deduplication Report", | |
| data=csv3, | |
| file_name="deduplication_recommendations.csv", | |
| mime="text/csv", | |
| use_container_width=True, | |
| ) | |
| # Detailed analysis section (expandable) | |
| with st.expander("π¬ Detailed Analysis"): | |
| tab1, tab2 = st.tabs(["New vs Existing Comparisons", "New vs New Comparisons"]) | |
| with tab1: | |
| st.dataframe( | |
| results["new_vs_existing_df"], use_container_width=True, hide_index=True | |
| ) | |
| with tab2: | |
| st.dataframe( | |
| results["new_vs_new_df"], use_container_width=True, hide_index=True | |
| ) | |
| # Footer | |
| st.markdown("---") | |
| st.markdown( | |
| "π‘ **Tip**: For best results, ensure your titles are well-written and descriptive. The similarity analysis works best with titles that have clear semantic meaning." | |
| ) |