import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline import torch from sklearn.metrics import classification_report, confusion_matrix, accuracy_score import io import base64 from textblob import TextBlob from collections import defaultdict from tabulate import tabulate from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans from sentence_transformers import SentenceTransformer from sklearn.decomposition import PCA from collections import Counter # Load models and initialize components model_path = "./final_model" tokenizer = AutoTokenizer.from_pretrained(model_path) model = AutoModelForSequenceClassification.from_pretrained(model_path) # Initialize summarizer with a smaller model and TF weights try: summarizer = pipeline( "summarization", model="sshleifer/distilbart-cnn-6-6", device=-1 # Use CPU ) except Exception as e: print(f"Error loading summarizer: {str(e)}") summarizer = None # Load dataset def load_dataset(): try: df = pd.read_csv("dataset.csv") # Ensure required columns exist required_columns = ['reviews.text', 'reviews.rating', 'name', 'categories'] if not all(col in df.columns for col in required_columns): raise ValueError("Missing required columns in dataset.csv") return df except Exception as e: print(f"Error loading dataset: {str(e)}") return None # Get initial summary def get_initial_summary(): df = load_dataset() if df is None: return "Error: Could not load dataset.csv" try: # First, create clusters if they don't exist if 'cluster_name' not in df.columns: df = create_clusters(df) # Generate summaries for all categories summaries = generate_category_summaries(df) # Convert summaries to HTML format for Gradio html_output = [] # Add dataset statistics unique_count = df['name'].nunique() total_count = len(df) avg_rating = df['reviews.rating'].mean() html_output.append(f"""

Dataset Statistics

""") # Add category summaries for category, tables in summaries.items(): html_output.append(f"

CATEGORY: {category}

") for table in tables: html_output.append(f"

{table['section']}

") # Convert table to HTML using tabulate table_html = tabulate( table['data'], headers=table['headers'], tablefmt="html", stralign="left", numalign="center" ) # Add some CSS styling styled_table = f""" {table_html} """ html_output.append(styled_table) html_output.append("
") # Add separator between categories return "\n".join(html_output) except Exception as e: import traceback print(traceback.format_exc()) # Print full error trace for debugging return f"Error generating initial summary: {str(e)}" def predict_sentiment(text): # Preprocess text text = text.lower() # Tokenize inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) # Get prediction with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits probabilities = torch.nn.functional.softmax(logits, dim=-1) predicted_class = torch.argmax(probabilities, dim=-1).item() # Map class to sentiment sentiment_map = {0: "Negative", 1: "Neutral", 2: "Positive"} sentiment = sentiment_map[predicted_class] # Get probabilities probs = probabilities[0].tolist() prob_dict = {sentiment_map[i]: f"{prob*100:.2f}%" for i, prob in enumerate(probs)} return sentiment, prob_dict def analyze_sentiment(reviews): """Perform sentiment analysis on reviews""" pros = defaultdict(int) cons = defaultdict(int) for review in reviews: blob = TextBlob(str(review)) for sentence in blob.sentences: polarity = sentence.sentiment.polarity words = [word for word, tag in blob.tags if tag in ('NN', 'NNS', 'JJ', 'JJR', 'JJS')] if polarity > 0.3: for word in words: pros[word] += 1 elif polarity < -0.3: for word in words: cons[word] += 1 pros_sorted = [k for k, _ in sorted(pros.items(), key=lambda x: -x[1])] if pros else [] cons_sorted = [k for k, _ in sorted(cons.items(), key=lambda x: -x[1])] if cons else [] return pros_sorted, cons_sorted def generate_category_summary(reviews_text): """Generate summary for a set of reviews""" reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] if not reviews: return "Please enter at least one review." # Analyze sentiment and get pros/cons pros, cons = analyze_sentiment(reviews) # Create summary text summary_text = f""" Review Analysis Summary: PROS: {', '.join(pros[:5]) if pros else 'No significant positive feedback'} CONS: {', '.join(cons[:5]) if cons else 'No major complaints'} Based on {len(reviews)} reviews analyzed. """ # Generate concise summary using BART if available if summarizer and len(summary_text) > 100: try: generated_summary = summarizer( summary_text, max_length=150, min_length=50, do_sample=False, truncation=True )[0]['summary_text'] except Exception as e: generated_summary = f"Error generating summary: {str(e)}" else: generated_summary = summary_text return generated_summary def analyze_reviews(reviews_text): # Original sentiment analysis df, plot_html = analyze_reviews_sentiment(reviews_text) # Create a temporary DataFrame with the new reviews temp_df = pd.DataFrame({ 'text': reviews_text.split('\n'), 'rating': [3] * len(reviews_text.split('\n')), # Default neutral rating 'name': ['New Review'] * len(reviews_text.split('\n')), 'cluster_name': ['New Reviews'] * len(reviews_text.split('\n')) }) # Generate summary tables summaries = generate_category_summaries(temp_df) # Convert summaries to HTML html_output = [] for category, tables in summaries.items(): for table in tables: html_output.append(f"

{table['section']}

") table_html = tabulate( table['data'], headers=table['headers'], tablefmt="html", stralign="left", numalign="center" ) html_output.append(table_html) summary_html = "\n".join(html_output) return df, plot_html, summary_html def analyze_reviews_sentiment(reviews_text): reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] if not reviews: return "Please enter at least one review.", None results = [] for review in reviews: sentiment, probs = predict_sentiment(review) results.append({ 'Review': review, 'Sentiment': sentiment, 'Confidence': probs }) df = pd.DataFrame(results) plt.figure(figsize=(10, 6)) sentiment_counts = df['Sentiment'].value_counts() plt.bar(sentiment_counts.index, sentiment_counts.values) plt.title('Sentiment Distribution') plt.xlabel('Sentiment') plt.ylabel('Count') buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) plot_base64 = base64.b64encode(buf.read()).decode('utf-8') plt.close() return df, f'' def create_interface(): # Get initial summary initial_summary = get_initial_summary() with gr.Blocks() as demo: gr.Markdown("# Review Analysis System") with gr.Tab("Review Analysis"): # Add initial dataset summary gr.Markdown("## Dataset Overview") gr.HTML(initial_summary) # Changed from gr.Markdown to gr.HTML gr.Markdown("## Analyze New Reviews") reviews_input = gr.Textbox( label="Enter reviews (one per line)", placeholder="Enter product reviews here...", lines=5 ) analyze_button = gr.Button("Analyze Reviews") with gr.Row(): with gr.Column(): sentiment_output = gr.Dataframe( label="Sentiment Analysis Results" ) plot_output = gr.HTML(label="Sentiment Distribution") with gr.Column(): summary_output = gr.HTML( # Changed from gr.Textbox to gr.HTML label="Review Summary" ) analyze_button.click( analyze_reviews, inputs=[reviews_input], outputs=[sentiment_output, plot_output, summary_output] ) return demo def add_clusters_to_df(df): """Add cluster names to the DataFrame if they don't exist""" # Create text features vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') text_features = vectorizer.fit_transform(df['text']) # Perform clustering n_clusters = 4 # You can adjust this kmeans = KMeans(n_clusters=n_clusters, random_state=42) df['cluster_name'] = kmeans.fit_predict(text_features) # Map cluster numbers to names cluster_names = { 0: "Electronics", 1: "Home & Kitchen", 2: "Books & Media", 3: "Other Products" } df['cluster_name'] = df['cluster_name'].map(cluster_names) return df def generate_category_summaries(df): """Generate product summaries in table format""" summaries = {} for category in df['cluster_name'].unique(): category_df = df[df['cluster_name'] == category] if len(category_df) < 10: continue # Get product statistics product_stats = category_df.groupby('name').agg({ 'reviews.rating': ['mean', 'count'], 'reviews.text': list }).reset_index() product_stats.columns = ['name', 'avg_rating', 'review_count', 'reviews'] product_stats = product_stats[product_stats['review_count'] >= 5] if len(product_stats) < 3: continue # Get top 3 and worst products top_3 = product_stats.nlargest(3, 'avg_rating') worst_product = product_stats.nsmallest(1, 'avg_rating') # Analyze reviews for each product product_details = [] for _, product in top_3.iterrows(): pros, cons = analyze_sentiment(product['reviews']) product_details.append({ 'name': product['name'], 'rating': product['avg_rating'], 'review_count': product['review_count'], 'pros': pros[:3] or ["No significant positive feedback"], 'cons': cons[:3] or ["No major complaints"] }) # Format tables tables = [] # Top Products Table top_table = [] for product in product_details: top_table.append([ product['name'], f"★{product['rating']:.1f}", product['review_count'], "\n".join(product['pros']), "\n".join(product['cons']) ]) tables.append({ 'section': f"TOP PRODUCTS IN {category.upper()}", 'headers': ["Product", "Rating", "Reviews", "Pros", "Cons"], 'data': top_table }) # Worst Product Table if not worst_product.empty: worst = worst_product.iloc[0] pros, cons = analyze_sentiment(worst['reviews']) tables.append({ 'section': "PRODUCT TO AVOID", 'headers': ["Product", "Rating", "Reasons to Avoid"], 'data': [[ worst['name'], f"★{worst['avg_rating']:.1f}", ", ".join(cons[:3]) if cons else "Consistently poor ratings" ]] }) summaries[category] = tables return summaries def create_clusters(df): """Create clusters from product data""" # Prepare product data products = df[['name', 'categories']].drop_duplicates() product_texts = (products['name'] + " " + products['categories']).tolist() # Create embeddings model = SentenceTransformer('all-MiniLM-L6-v2') embeddings = model.encode(product_texts, show_progress_bar=True) # Perform clustering num_clusters = 4 kmeans = KMeans(n_clusters=num_clusters, random_state=42) clusters = kmeans.fit_predict(embeddings) products['cluster'] = clusters # Generate cluster names cluster_names = {} for cluster_num in range(num_clusters): cluster_df = products[products['cluster'] == cluster_num] # Get descriptive words from product names words = [] for name in cluster_df['name']: words += name.lower().split() # Get top words for cluster name top_words = [word for word, count in Counter(words).most_common(10) if len(word) > 3][:3] label = ' '.join(top_words) cluster_names[cluster_num] = label # Map clusters to original dataframe product_to_cluster = dict(zip(products['name'], products['cluster'])) df['cluster'] = df['name'].map(product_to_cluster) df['cluster_name'] = df['cluster'].map(cluster_names) return df # Create and launch the interface if __name__ == "__main__": demo = create_interface() demo.launch()