|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline |
|
import torch |
|
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score |
|
import io |
|
import base64 |
|
from textblob import TextBlob |
|
from collections import defaultdict |
|
from tabulate import tabulate |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.cluster import KMeans |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.decomposition import PCA |
|
from collections import Counter |
|
|
|
|
|
model_path = "./final_model" |
|
tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
model = AutoModelForSequenceClassification.from_pretrained(model_path) |
|
|
|
|
|
try: |
|
summarizer = pipeline( |
|
"summarization", |
|
model="sshleifer/distilbart-cnn-6-6", |
|
device=-1 |
|
) |
|
except Exception as e: |
|
print(f"Error loading summarizer: {str(e)}") |
|
summarizer = None |
|
|
|
|
|
def load_dataset(): |
|
try: |
|
df = pd.read_csv("dataset.csv") |
|
|
|
required_columns = ['reviews.text', 'reviews.rating', 'name', 'categories'] |
|
if not all(col in df.columns for col in required_columns): |
|
raise ValueError("Missing required columns in dataset.csv") |
|
return df |
|
except Exception as e: |
|
print(f"Error loading dataset: {str(e)}") |
|
return None |
|
|
|
|
|
def get_initial_summary(): |
|
df = load_dataset() |
|
if df is None: |
|
return "Error: Could not load dataset.csv" |
|
|
|
try: |
|
|
|
if 'cluster_name' not in df.columns: |
|
df = create_clusters(df) |
|
|
|
|
|
summaries = generate_category_summaries(df) |
|
|
|
|
|
html_output = [] |
|
|
|
|
|
unique_count = df['name'].nunique() |
|
total_count = len(df) |
|
avg_rating = df['reviews.rating'].mean() |
|
|
|
html_output.append(f""" |
|
<h2>Dataset Statistics</h2> |
|
<ul> |
|
<li>Total Reviews: {total_count}</li> |
|
<li>Unique Products: {unique_count}</li> |
|
<li>Average Rating: {avg_rating:.2f}β</li> |
|
</ul> |
|
""") |
|
|
|
|
|
for category, tables in summaries.items(): |
|
html_output.append(f"<h2>CATEGORY: {category}</h2>") |
|
|
|
for table in tables: |
|
html_output.append(f"<h3>{table['section']}</h3>") |
|
|
|
table_html = tabulate( |
|
table['data'], |
|
headers=table['headers'], |
|
tablefmt="html", |
|
stralign="left", |
|
numalign="center" |
|
) |
|
|
|
styled_table = f""" |
|
<style> |
|
table {{ |
|
border-collapse: collapse; |
|
margin: 15px 0; |
|
width: 100%; |
|
box-shadow: 0 1px 3px rgba(0,0,0,0.2); |
|
}} |
|
th, td {{ |
|
padding: 12px; |
|
border: 1px solid #ddd; |
|
text-align: left; |
|
}} |
|
th {{ |
|
background-color: #f5f5f5; |
|
font-weight: bold; |
|
}} |
|
tr:nth-child(even) {{ |
|
background-color: #f9f9f9; |
|
}} |
|
tr:hover {{ |
|
background-color: #f5f5f5; |
|
}} |
|
</style> |
|
{table_html} |
|
""" |
|
html_output.append(styled_table) |
|
|
|
html_output.append("<hr>") |
|
|
|
return "\n".join(html_output) |
|
except Exception as e: |
|
import traceback |
|
print(traceback.format_exc()) |
|
return f"Error generating initial summary: {str(e)}" |
|
|
|
def predict_sentiment(text): |
|
|
|
text = text.lower() |
|
|
|
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
logits = outputs.logits |
|
probabilities = torch.nn.functional.softmax(logits, dim=-1) |
|
predicted_class = torch.argmax(probabilities, dim=-1).item() |
|
|
|
|
|
sentiment_map = {0: "Negative", 1: "Neutral", 2: "Positive"} |
|
sentiment = sentiment_map[predicted_class] |
|
|
|
|
|
probs = probabilities[0].tolist() |
|
prob_dict = {sentiment_map[i]: f"{prob*100:.2f}%" for i, prob in enumerate(probs)} |
|
|
|
return sentiment, prob_dict |
|
|
|
def analyze_sentiment(reviews): |
|
"""Perform sentiment analysis on reviews""" |
|
pros = defaultdict(int) |
|
cons = defaultdict(int) |
|
|
|
for review in reviews: |
|
blob = TextBlob(str(review)) |
|
for sentence in blob.sentences: |
|
polarity = sentence.sentiment.polarity |
|
words = [word for word, tag in blob.tags |
|
if tag in ('NN', 'NNS', 'JJ', 'JJR', 'JJS')] |
|
|
|
if polarity > 0.3: |
|
for word in words: |
|
pros[word] += 1 |
|
elif polarity < -0.3: |
|
for word in words: |
|
cons[word] += 1 |
|
|
|
pros_sorted = [k for k, _ in sorted(pros.items(), key=lambda x: -x[1])] if pros else [] |
|
cons_sorted = [k for k, _ in sorted(cons.items(), key=lambda x: -x[1])] if cons else [] |
|
|
|
return pros_sorted, cons_sorted |
|
|
|
def generate_category_summary(reviews_text): |
|
"""Generate summary for a set of reviews""" |
|
reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] |
|
|
|
if not reviews: |
|
return "Please enter at least one review." |
|
|
|
|
|
pros, cons = analyze_sentiment(reviews) |
|
|
|
|
|
summary_text = f""" |
|
Review Analysis Summary: |
|
|
|
PROS: |
|
{', '.join(pros[:5]) if pros else 'No significant positive feedback'} |
|
|
|
CONS: |
|
{', '.join(cons[:5]) if cons else 'No major complaints'} |
|
|
|
Based on {len(reviews)} reviews analyzed. |
|
""" |
|
|
|
|
|
if summarizer and len(summary_text) > 100: |
|
try: |
|
generated_summary = summarizer( |
|
summary_text, |
|
max_length=150, |
|
min_length=50, |
|
do_sample=False, |
|
truncation=True |
|
)[0]['summary_text'] |
|
except Exception as e: |
|
generated_summary = f"Error generating summary: {str(e)}" |
|
else: |
|
generated_summary = summary_text |
|
|
|
return generated_summary |
|
|
|
def analyze_reviews(reviews_text): |
|
|
|
df, plot_html = analyze_reviews_sentiment(reviews_text) |
|
|
|
|
|
temp_df = pd.DataFrame({ |
|
'text': reviews_text.split('\n'), |
|
'rating': [3] * len(reviews_text.split('\n')), |
|
'name': ['New Review'] * len(reviews_text.split('\n')), |
|
'cluster_name': ['New Reviews'] * len(reviews_text.split('\n')) |
|
}) |
|
|
|
|
|
summaries = generate_category_summaries(temp_df) |
|
|
|
|
|
html_output = [] |
|
for category, tables in summaries.items(): |
|
for table in tables: |
|
html_output.append(f"<h3>{table['section']}</h3>") |
|
table_html = tabulate( |
|
table['data'], |
|
headers=table['headers'], |
|
tablefmt="html", |
|
stralign="left", |
|
numalign="center" |
|
) |
|
html_output.append(table_html) |
|
|
|
summary_html = "\n".join(html_output) |
|
|
|
return df, plot_html, summary_html |
|
|
|
def analyze_reviews_sentiment(reviews_text): |
|
reviews = [r.strip() for r in reviews_text.split('\n') if r.strip()] |
|
|
|
if not reviews: |
|
return "Please enter at least one review.", None |
|
|
|
results = [] |
|
for review in reviews: |
|
sentiment, probs = predict_sentiment(review) |
|
results.append({ |
|
'Review': review, |
|
'Sentiment': sentiment, |
|
'Confidence': probs |
|
}) |
|
|
|
df = pd.DataFrame(results) |
|
|
|
plt.figure(figsize=(10, 6)) |
|
sentiment_counts = df['Sentiment'].value_counts() |
|
plt.bar(sentiment_counts.index, sentiment_counts.values) |
|
plt.title('Sentiment Distribution') |
|
plt.xlabel('Sentiment') |
|
plt.ylabel('Count') |
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png') |
|
buf.seek(0) |
|
plot_base64 = base64.b64encode(buf.read()).decode('utf-8') |
|
plt.close() |
|
|
|
return df, f'<img src="data:image/png;base64,{plot_base64}" style="max-width:100%;">' |
|
|
|
def create_interface(): |
|
|
|
initial_summary = get_initial_summary() |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Review Analysis System") |
|
|
|
with gr.Tab("Review Analysis"): |
|
|
|
gr.Markdown("## Dataset Overview") |
|
gr.HTML(initial_summary) |
|
|
|
gr.Markdown("## Analyze New Reviews") |
|
reviews_input = gr.Textbox( |
|
label="Enter reviews (one per line)", |
|
placeholder="Enter product reviews here...", |
|
lines=5 |
|
) |
|
analyze_button = gr.Button("Analyze Reviews") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
sentiment_output = gr.Dataframe( |
|
label="Sentiment Analysis Results" |
|
) |
|
plot_output = gr.HTML(label="Sentiment Distribution") |
|
|
|
with gr.Column(): |
|
summary_output = gr.HTML( |
|
label="Review Summary" |
|
) |
|
|
|
analyze_button.click( |
|
analyze_reviews, |
|
inputs=[reviews_input], |
|
outputs=[sentiment_output, plot_output, summary_output] |
|
) |
|
|
|
return demo |
|
|
|
def add_clusters_to_df(df): |
|
"""Add cluster names to the DataFrame if they don't exist""" |
|
|
|
vectorizer = TfidfVectorizer(max_features=1000, stop_words='english') |
|
text_features = vectorizer.fit_transform(df['text']) |
|
|
|
|
|
n_clusters = 4 |
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
|
df['cluster_name'] = kmeans.fit_predict(text_features) |
|
|
|
|
|
cluster_names = { |
|
0: "Electronics", |
|
1: "Home & Kitchen", |
|
2: "Books & Media", |
|
3: "Other Products" |
|
} |
|
df['cluster_name'] = df['cluster_name'].map(cluster_names) |
|
|
|
return df |
|
|
|
def generate_category_summaries(df): |
|
"""Generate product summaries in table format""" |
|
summaries = {} |
|
|
|
for category in df['cluster_name'].unique(): |
|
category_df = df[df['cluster_name'] == category] |
|
|
|
if len(category_df) < 10: |
|
continue |
|
|
|
|
|
product_stats = category_df.groupby('name').agg({ |
|
'reviews.rating': ['mean', 'count'], |
|
'reviews.text': list |
|
}).reset_index() |
|
|
|
product_stats.columns = ['name', 'avg_rating', 'review_count', 'reviews'] |
|
product_stats = product_stats[product_stats['review_count'] >= 5] |
|
|
|
if len(product_stats) < 3: |
|
continue |
|
|
|
|
|
top_3 = product_stats.nlargest(3, 'avg_rating') |
|
worst_product = product_stats.nsmallest(1, 'avg_rating') |
|
|
|
|
|
product_details = [] |
|
for _, product in top_3.iterrows(): |
|
pros, cons = analyze_sentiment(product['reviews']) |
|
product_details.append({ |
|
'name': product['name'], |
|
'rating': product['avg_rating'], |
|
'review_count': product['review_count'], |
|
'pros': pros[:3] or ["No significant positive feedback"], |
|
'cons': cons[:3] or ["No major complaints"] |
|
}) |
|
|
|
|
|
tables = [] |
|
|
|
|
|
top_table = [] |
|
for product in product_details: |
|
top_table.append([ |
|
product['name'], |
|
f"β
{product['rating']:.1f}", |
|
product['review_count'], |
|
"\n".join(product['pros']), |
|
"\n".join(product['cons']) |
|
]) |
|
|
|
tables.append({ |
|
'section': f"TOP PRODUCTS IN {category.upper()}", |
|
'headers': ["Product", "Rating", "Reviews", "Pros", "Cons"], |
|
'data': top_table |
|
}) |
|
|
|
|
|
if not worst_product.empty: |
|
worst = worst_product.iloc[0] |
|
pros, cons = analyze_sentiment(worst['reviews']) |
|
tables.append({ |
|
'section': "PRODUCT TO AVOID", |
|
'headers': ["Product", "Rating", "Reasons to Avoid"], |
|
'data': [[ |
|
worst['name'], |
|
f"β
{worst['avg_rating']:.1f}", |
|
", ".join(cons[:3]) if cons else "Consistently poor ratings" |
|
]] |
|
}) |
|
|
|
summaries[category] = tables |
|
|
|
return summaries |
|
|
|
def create_clusters(df): |
|
"""Create clusters from product data""" |
|
|
|
products = df[['name', 'categories']].drop_duplicates() |
|
product_texts = (products['name'] + " " + products['categories']).tolist() |
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
embeddings = model.encode(product_texts, show_progress_bar=True) |
|
|
|
|
|
num_clusters = 4 |
|
kmeans = KMeans(n_clusters=num_clusters, random_state=42) |
|
clusters = kmeans.fit_predict(embeddings) |
|
products['cluster'] = clusters |
|
|
|
|
|
cluster_names = {} |
|
for cluster_num in range(num_clusters): |
|
cluster_df = products[products['cluster'] == cluster_num] |
|
|
|
|
|
words = [] |
|
for name in cluster_df['name']: |
|
words += name.lower().split() |
|
|
|
|
|
top_words = [word for word, count in Counter(words).most_common(10) |
|
if len(word) > 3][:3] |
|
label = ' '.join(top_words) |
|
cluster_names[cluster_num] = label |
|
|
|
|
|
product_to_cluster = dict(zip(products['name'], products['cluster'])) |
|
df['cluster'] = df['name'].map(product_to_cluster) |
|
df['cluster_name'] = df['cluster'].map(cluster_names) |
|
|
|
return df |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |