Spaces:
Sleeping
Sleeping
import gradio as gr | |
from gradio import SelectData | |
import torch | |
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification | |
import pandas as pd | |
from wordcloud import WordCloud | |
import io | |
import base64 | |
from PIL import Image | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.decomposition import LatentDirichletAllocation as LDA | |
import nltk | |
from nltk.corpus import stopwords | |
from langdetect import detect | |
import langdetect | |
import re | |
from collections import Counter | |
from nltk.util import ngrams | |
from googletrans import Translator | |
import asyncio | |
# 下载停用词 | |
nltk.download('stopwords', quiet=True) | |
nltk.download('punkt', quiet=True) | |
# 支持的语言 | |
SUPPORTED_LANGUAGES = ['english', 'spanish', 'french', 'german', 'italian', 'portuguese', 'russian', 'arabic', 'japanese'] | |
# 创建语言停用词字典 | |
LANGUAGE_STOPWORDS = {} | |
for lang in SUPPORTED_LANGUAGES: | |
if lang in stopwords.fileids(): | |
LANGUAGE_STOPWORDS[lang] = set(stopwords.words(lang)) | |
# 语言代码映射 | |
LANG_CODE_MAP = { | |
'en': 'english', | |
'es': 'spanish', | |
'fr': 'french', | |
'de': 'german', | |
'it': 'italian', | |
'pt': 'portuguese', | |
'ru': 'russian', | |
'ar': 'arabic', | |
'ja': 'japanese' | |
} | |
def get_stopwords(text): | |
"""根据文本语言返回相应的停用词""" | |
try: | |
lang_code = detect(text) | |
lang = LANG_CODE_MAP.get(lang_code, 'english') | |
return LANGUAGE_STOPWORDS.get(lang, LANGUAGE_STOPWORDS['english']) | |
except langdetect.LangDetectException: | |
return LANGUAGE_STOPWORDS['english'] | |
# 初始化模型和分词器 | |
MODEL = "sohan-ai/sentiment-analysis-model-amazon-reviews" | |
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased") | |
model = DistilBertForSequenceClassification.from_pretrained(MODEL) | |
# 全局变量 | |
current_bigram_samples = [] | |
FULL_BIGRAM_DF = pd.DataFrame() # 存储完整的bigram数据 | |
last_selected_reviews = [] # 存放最后一次选中的评论列表 | |
translator = Translator() # 初始化翻译器 | |
def filter_bigrams(search_text): | |
"""过滤关键词组""" | |
global FULL_BIGRAM_DF | |
if not search_text.strip(): | |
return FULL_BIGRAM_DF | |
# 不区分大小写的搜索 | |
mask = FULL_BIGRAM_DF["词组"].str.contains(search_text, case=False, na=False) | |
return FULL_BIGRAM_DF[mask] | |
def analyze_text(text): | |
"""分析单个文本的情感""" | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) | |
outputs = model(**inputs) | |
scores = torch.nn.functional.softmax(outputs.logits, dim=1) | |
scores = scores.detach().numpy()[0] | |
return { | |
"积极情感概率": float(scores[1]), | |
"消极情感概率": float(scores[0]), | |
"整体情感": "积极" if scores[1] > scores[0] else "消极" | |
} | |
def preprocess_text(text): | |
"""预处理文本""" | |
# 转换为小写 | |
text = text.lower() | |
# 去除特殊字符,只保留字母和空格 | |
text = re.sub(r'[^a-z\s]', ' ', text) | |
# 去除多余空格 | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def extract_bigrams(texts, min_freq=2, max_freq_ratio=0.9): | |
"""提取关键词组(两个单词)""" | |
# 预处理所有文本 | |
processed_texts = [preprocess_text(text) for text in texts] | |
# 提取所有双词组及其对应的文本 | |
all_bigrams = [] | |
bigram_texts = {} # 存储词组对应的原始文本 | |
for idx, (text, processed) in enumerate(zip(texts, processed_texts)): | |
words = processed.split() | |
text_bigrams = list(ngrams(words, 2)) | |
text_bigram_strs = [' '.join(bigram) for bigram in text_bigrams] | |
all_bigrams.extend(text_bigram_strs) | |
# 记录每个词组对应的原始文本 | |
for bigram in text_bigram_strs: | |
if bigram not in bigram_texts: | |
bigram_texts[bigram] = [] | |
bigram_texts[bigram].append(text) | |
# 计算词组频率 | |
bigram_freq = Counter(all_bigrams) | |
total_docs = len(texts) # 总评论数 | |
# 过滤词组 | |
filtered_bigrams = { | |
bigram: freq for bigram, freq in bigram_freq.items() | |
if min_freq <= freq <= total_docs * max_freq_ratio # 保留在频率范围内的词组 | |
} | |
# 创建词组统计DataFrame | |
bigram_stats = [] | |
# 准备Dataset数据 | |
dataset_samples = [] | |
for bigram, freq in sorted(filtered_bigrams.items(), key=lambda x: x[1], reverse=True): | |
# 计算占总评论数的百分比 | |
percentage = freq / total_docs * 100 | |
# 获取该词组对应的所有文本 | |
related_texts = bigram_texts[bigram] | |
# 统计DataFrame数据 | |
bigram_stats.append({ | |
"词组": bigram, | |
"出现次数": freq, | |
"占比": f"{percentage:.2f}%" # 占总评论数的百分比 | |
}) | |
# Dataset数据 | |
formatted_texts = "\n\n".join(f"{i+1}. {text}" for i, text in enumerate(related_texts)) | |
dataset_samples.append([bigram, [formatted_texts]]) | |
return pd.DataFrame(bigram_stats), dataset_samples | |
def perform_lda_analysis(texts, n_topics=15): | |
"""执行LDA主题分析""" | |
# 获取动态停用词 | |
stop_words = list(get_stopwords(' '.join(texts))) | |
# 创建TF-IDF向量化器 | |
vectorizer = TfidfVectorizer( | |
max_df=0.9, # 忽略在90%以上文档中出现的词 | |
min_df=2, # 忽略在少于2个文档中出现的词 | |
stop_words=stop_words, # 使用动态停用词 | |
ngram_range=(2, 2) # 使用双词组(bigrams) | |
) | |
# 预处理文本 | |
processed_texts = [preprocess_text(text) for text in texts] | |
# 转换文本数据 | |
try: | |
tfidf = vectorizer.fit_transform(processed_texts) | |
# 创建并训练LDA模型 | |
lda_model = LDA( | |
n_components=n_topics, | |
random_state=0 | |
) | |
lda_output = lda_model.fit_transform(tfidf) | |
# 获取特征词 | |
feature_names = vectorizer.get_feature_names_out() | |
# 整理主题词 | |
topics = [] | |
for topic_idx, topic in enumerate(lda_model.components_): | |
top_words_idx = topic.argsort()[:-15:-1] # 获取前15个词组 | |
top_words = [feature_names[i] for i in top_words_idx] | |
topics.append({ | |
"主题": f"主题 {topic_idx + 1}", | |
"关键词": ", ".join(top_words) | |
}) | |
# 获取每个文档的主题分布 | |
doc_topics = [] | |
for doc_idx, doc_topics_dist in enumerate(lda_output): | |
dominant_topic = doc_topics_dist.argmax() | |
doc_topics.append({ | |
"文本": texts[doc_idx], # 显示完整文本 | |
"主导主题": f"主题 {dominant_topic + 1}", | |
"主题概率": f"{doc_topics_dist[dominant_topic]:.2%}" | |
}) | |
return pd.DataFrame(topics), pd.DataFrame(doc_topics) | |
except ValueError as e: | |
# 如果没有足够的词组进行分析,返回空的DataFrame | |
empty_topics = pd.DataFrame(columns=["主题", "关键词"]) | |
empty_docs = pd.DataFrame(columns=["文本", "主导主题", "主题概率"]) | |
return empty_topics, empty_docs | |
def create_pie_chart(positive_count, negative_count): | |
"""创建情感分布饼图""" | |
fig = go.Figure(data=[go.Pie( | |
labels=['积极评价', '消极评价'], | |
values=[positive_count, negative_count], | |
hole=.3, | |
marker_colors=['#2ecc71', '#e74c3c'] | |
)]) | |
fig.update_layout( | |
title="情感分布", | |
showlegend=True, | |
width=400, | |
height=400 | |
) | |
return fig | |
def create_score_histogram(df): | |
"""创建情感得分直方图""" | |
fig = go.Figure() | |
fig.add_trace(go.Histogram( | |
x=df["积极情感概率"], | |
name="积极情感", | |
nbinsx=20, | |
marker_color='#2ecc71' | |
)) | |
fig.add_trace(go.Histogram( | |
x=df["消极情感概率"], | |
name="消极情感", | |
nbinsx=20, | |
marker_color='#e74c3c' | |
)) | |
fig.update_layout( | |
title="情感得分分布", | |
xaxis_title="情感得分", | |
yaxis_title="评论数量", | |
barmode='overlay', | |
width=600, | |
height=400 | |
) | |
return fig | |
def analyze_file(file, progress=gr.Progress()): | |
"""分析文件中的多个文本""" | |
global current_bigram_samples, FULL_BIGRAM_DF | |
results = [] | |
try: | |
# 读取文件内容 | |
if file is None: | |
return "请上传文件", None, None, None, None, None, None, None, None, "", None | |
# 读取上传的文件内容 | |
text_content = file.name | |
with open(text_content, 'r', encoding='utf-8') as f: | |
content = f.readlines() | |
progress(0, desc="正在预处理文本...") | |
# 处理每一行评论 | |
texts = [] # 存储所有文本用于LDA分析 | |
total_lines = len([line for line in content if line.strip()]) | |
# 检测语言 | |
all_text = ' '.join([line.strip() for line in content if line.strip()]) | |
try: | |
lang_code = detect(all_text) | |
detected_lang = LANG_CODE_MAP.get(lang_code, 'english') | |
lang_info = f"检测到语言:{detected_lang},将使用对应的停用词列表" | |
except: | |
detected_lang = 'english' | |
lang_info = "语言检测失败,将使用英语停用词列表" | |
progress(0.1, desc="正在进行情感分析...") | |
for i, line in enumerate(content): | |
if line.strip(): | |
result = analyze_text(line.strip()) | |
results.append({ | |
"文本": line.strip(), | |
**result | |
}) | |
texts.append(line.strip()) | |
progress((i + 1) / total_lines * 0.3) # 情感分析占30%进度 | |
# 创建DataFrame | |
df = pd.DataFrame(results) | |
# 生成统计信息 | |
total = len(df) | |
if total == 0: | |
return "没有找到有效的评论文本", None, None, None, None, None, None, None, None, "", None | |
positive = len(df[df["整体情感"] == "积极"]) | |
negative = len(df[df["整体情感"] == "消极"]) | |
# 生成分析统计信息 | |
analysis_info = ( | |
f"{lang_info}\n" | |
f"分析完成!共分析{total}条文本\n" | |
f"积极:{positive}条 ({positive/total*100:.1f}%)\n" | |
f"消极:{negative}条 ({negative/total*100:.1f}%)" | |
) | |
progress(0.4, desc="正在生成词云...") | |
# 生成词云 | |
positive_text = " ".join(df[df["整体情感"] == "积极"]["文本"]) | |
negative_text = " ".join(df[df["整体情感"] == "消极"]["文本"]) | |
pos_wordcloud = None | |
neg_wordcloud = None | |
if positive_text: | |
pos_wordcloud = WordCloud(width=400, height=200, background_color='white').generate(positive_text) | |
pos_wordcloud = pos_wordcloud.to_image() | |
if negative_text: | |
neg_wordcloud = WordCloud(width=400, height=200, background_color='white').generate(negative_text) | |
neg_wordcloud = neg_wordcloud.to_image() | |
progress(0.5, desc="正在生成可视化图表...") | |
# 创建可视化图表 | |
pie_chart = create_pie_chart(positive, negative) | |
score_hist = create_score_histogram(df) | |
progress(0.6, desc="正在提取关键词组...") | |
# 提取关键词组 | |
bigrams_df, bigram_samples = extract_bigrams(texts) | |
current_bigram_samples = bigram_samples # 更新全局变量 | |
FULL_BIGRAM_DF = bigrams_df.copy() # 保存完整的bigram数据 | |
progress(0.7, desc="正在进行主题分析...") | |
# 执行LDA主题分析 | |
topics_df, doc_topics_df = perform_lda_analysis(texts) | |
progress(0.9, desc="正在保存结果...") | |
# 准备显示用的DataFrame | |
display_df = df.copy() | |
display_df["积极情感概率"] = display_df["积极情感概率"].apply(lambda x: f"{x:.2%}") | |
display_df["消极情感概率"] = display_df["消极情感概率"].apply(lambda x: f"{x:.2%}") | |
# 保存结果到Excel文件,包含多个sheet | |
excel_path = "sentiment_analysis_results.xlsx" | |
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: | |
# 保存情感分析结果 | |
df.to_excel(writer, sheet_name='情感分析结果', index=False) | |
# 保存LDA主题关键词 | |
topics_df.to_excel(writer, sheet_name='主题关键词', index=False) | |
# 保存文档主题分布 | |
doc_topics_df.to_excel(writer, sheet_name='文档主题分布', index=False) | |
# 保存关键词组统计 | |
bigrams_df.to_excel(writer, sheet_name='关键词组统计', index=False) | |
progress(1.0, desc="分析完成!") | |
return ( | |
analysis_info, | |
pos_wordcloud, | |
neg_wordcloud, | |
display_df, | |
pie_chart, | |
score_hist, | |
topics_df, | |
doc_topics_df, | |
bigrams_df, | |
'<div style="color: #666; padding: 10px;">请点击左侧词组查看相关评论</div>', # 初始HTML提示 | |
excel_path | |
) | |
except Exception as e: | |
import traceback | |
error_msg = f"处理文件时出错:{str(e)}\n{traceback.format_exc()}" | |
return error_msg, None, None, None, None, None, None, None, None, "", None | |
def single_text_interface(text): | |
"""单文本分析界面的处理函数""" | |
if not text.strip(): | |
return "请输入要分析的文本" | |
result = analyze_text(text) | |
return ( | |
f"积极情感概率:{result['积极情感概率']:.2%}\n" | |
f"消极情感概率:{result['消极情感概率']:.2%}\n" | |
f"整体情感:{result['整体情感']}" | |
) | |
def highlight_keyword(text, keyword): | |
"""用 <mark> 给 keyword 做简单的大小写不敏感高亮""" | |
pattern = re.compile(re.escape(keyword), re.IGNORECASE) | |
return pattern.sub(r'<mark style="background-color: #ffd700; padding: 0 2px; border-radius: 2px;">\g<0></mark>', text) | |
def show_bigram_reviews(evt: gr.SelectData, df): | |
"""显示选中词组的相关评论""" | |
global current_bigram_samples, last_selected_reviews | |
selected_bigram = df.iloc[evt.index[0]]["词组"] # 获取选中行的词组 | |
# 清空上一次的评论列表 | |
last_selected_reviews = [] | |
for sample in current_bigram_samples: | |
if sample[0] == selected_bigram: | |
# 将评论转换为HTML格式 | |
reviews = sample[1][0].split("\n\n") | |
highlighted_reviews = [] | |
for i, review in enumerate(reviews, start=1): | |
# 保存原文评论(含序号)到全局变量 | |
last_selected_reviews.append(review) | |
# 提取评论内容(去除序号前缀) | |
review_content = review.split(". ", 1)[1] if ". " in review else review | |
# 高亮关键词 | |
highlighted_review = highlight_keyword(review_content, selected_bigram) | |
# 添加序号和样式 | |
highlighted_reviews.append( | |
f'<div style="margin-bottom: 10px; padding: 10px; background-color: #f5f5f5; border-radius: 5px;">' | |
f'<span style="font-weight: bold; color: #666;">#{i}</span> {highlighted_review}' | |
f'</div>' | |
) | |
# 拼接成完整的HTML | |
html_content = ( | |
'<div style="max-height: 500px; overflow-y: auto; padding: 10px;">' | |
f'<div style="margin-bottom: 10px; color: #333;">找到 {len(reviews)} 条包含 "<b>{selected_bigram}</b>" 的评论:</div>' | |
f'{"".join(highlighted_reviews)}' | |
'</div>' | |
) | |
return html_content | |
return '<div style="color: #666; padding: 10px;">未找到相关评论</div>' | |
def translate_single_comment(comment_index): | |
"""翻译单条评论""" | |
global last_selected_reviews | |
if not last_selected_reviews: | |
return "请先选择一个词组查看相关评论。" | |
try: | |
comment_index = int(comment_index) | |
except: | |
return "请输入有效的评论序号(数字)" | |
if comment_index < 1 or comment_index > len(last_selected_reviews): | |
return f"评论序号超出范围!可选范围: 1~{len(last_selected_reviews)}" | |
# 获取原文并去除序号前缀 | |
original_text = last_selected_reviews[comment_index - 1] | |
parts = original_text.split(". ", 1) | |
if len(parts) == 2: | |
original_text = parts[1] | |
else: | |
original_text = parts[0] | |
try: | |
# 创建异步事件循环 | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
async def translate_async(): | |
async with Translator() as translator: | |
result = await translator.translate(original_text, dest='zh-cn') | |
return result | |
# 运行异步翻译 | |
result = loop.run_until_complete(translate_async()) | |
loop.close() | |
return f"原文:\n{original_text}\n\n中文翻译:\n{result.text}" | |
except Exception as e: | |
# 如果是网络错误,提示用户 | |
if "HTTPSConnectionPool" in str(e): | |
return "网络连接错误,请检查网络连接并重试" | |
return f"翻译出错: {str(e)}" | |
# 创建Gradio界面 | |
with gr.Blocks(title="亚马逊评论文本情感分析系统", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# 亚马逊评论文本情感分析系统") | |
with gr.Tabs(): | |
with gr.TabItem("单文本分析"): | |
with gr.Row(): | |
with gr.Column(): | |
text_input = gr.Textbox( | |
label="输入文本", | |
lines=3, | |
placeholder="请输入要分析的文本...", | |
value="" | |
) | |
analyze_btn = gr.Button("分析", variant="primary") | |
with gr.Column(): | |
text_output = gr.Textbox(label="分析结果", lines=3) | |
analyze_btn.click( | |
single_text_interface, | |
inputs=[text_input], | |
outputs=[text_output] | |
) | |
with gr.TabItem("批量文件分析"): | |
with gr.Row(): | |
file_input = gr.File( | |
label="上传文本文件(UTF-8编码的txt文件,每行一条评论)", | |
file_types=[".txt"] | |
) | |
analyze_file_btn = gr.Button("开始分析", variant="primary") | |
with gr.Row(): | |
file_output = gr.Textbox(label="分析统计", lines=4) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### 评论情感分布") | |
pie_chart = gr.Plot() | |
with gr.Column(): | |
gr.Markdown("### 情感得分分布") | |
score_hist = gr.Plot() | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### 积极评论词云") | |
pos_wordcloud = gr.Image() | |
with gr.Column(): | |
gr.Markdown("### 消极评论词云") | |
neg_wordcloud = gr.Image() | |
gr.Markdown("### 关键词组统计") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 添加搜索框 | |
search_box = gr.Textbox( | |
label="搜索关键词组", | |
placeholder="输入关键词以过滤词组...", | |
show_label=True | |
) | |
bigrams_df = gr.Dataframe( | |
headers=["词组", "出现次数", "占比"], | |
datatype=["str", "number", "str"], | |
wrap=True, | |
interactive=True | |
) | |
# 添加搜索事件 | |
search_box.change( | |
fn=filter_bigrams, | |
inputs=[search_box], | |
outputs=[bigrams_df] | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("#### 选中词组的相关评论") | |
bigram_reviews = gr.HTML() | |
# 添加翻译功能组件 | |
with gr.Row(): | |
comment_index = gr.Number( | |
label="要翻译的评论序号", | |
value=1, | |
precision=0 | |
) | |
translate_btn = gr.Button("翻译") | |
translate_output = gr.Textbox( | |
label="翻译结果", | |
lines=6 | |
) | |
# 添加词组选择事件 | |
bigrams_df.select( | |
fn=show_bigram_reviews, | |
inputs=[bigrams_df], | |
outputs=bigram_reviews | |
) | |
# 添加翻译按钮事件 | |
translate_btn.click( | |
fn=translate_single_comment, | |
inputs=[comment_index], | |
outputs=[translate_output] | |
) | |
gr.Markdown("### 主题分析结果") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("#### 主题关键词(越靠前,主题越重要,提到次数越多)") | |
topics_df = gr.Dataframe( | |
headers=["主题", "关键词"], | |
datatype=["str", "str"], | |
wrap=True | |
) | |
with gr.Column(): | |
gr.Markdown("#### 文档-主题分布") | |
doc_topics_df = gr.Dataframe( | |
headers=["文本", "主导主题", "主题概率"], | |
datatype=["str", "str", "str"], | |
wrap=True | |
) | |
gr.Markdown("### 详细分析结果") | |
results_df = gr.Dataframe( | |
headers=["文本", "积极情感概率", "消极情感概率", "整体情感"], | |
datatype=["str", "str", "str", "str"], | |
wrap=True | |
) | |
file_download = gr.File(label="下载完整分析结果(Excel)") | |
analyze_file_btn.click( | |
analyze_file, | |
inputs=[file_input], | |
outputs=[file_output, pos_wordcloud, neg_wordcloud, results_df, pie_chart, score_hist, topics_df, doc_topics_df, bigrams_df, bigram_reviews, file_download] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=False) |