waka's picture
fix front msyh.ttc
06113ab verified
import gradio as gr
from gradio import SelectData
import torch
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import pandas as pd
from wordcloud import WordCloud
import io
import base64
from PIL import Image
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation as LDA
import nltk
from nltk.corpus import stopwords
from langdetect import detect
import langdetect
import re
from collections import Counter
from nltk.util import ngrams
from googletrans import Translator
import asyncio
# 下载停用词
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
# 支持的语言
SUPPORTED_LANGUAGES = ['english', 'spanish', 'french', 'german', 'italian', 'portuguese', 'russian', 'arabic', 'japanese']
# 创建语言停用词字典
LANGUAGE_STOPWORDS = {}
for lang in SUPPORTED_LANGUAGES:
if lang in stopwords.fileids():
LANGUAGE_STOPWORDS[lang] = set(stopwords.words(lang))
# 语言代码映射
LANG_CODE_MAP = {
'en': 'english',
'es': 'spanish',
'fr': 'french',
'de': 'german',
'it': 'italian',
'pt': 'portuguese',
'ru': 'russian',
'ar': 'arabic',
'ja': 'japanese'
}
def get_stopwords(text):
"""根据文本语言返回相应的停用词"""
try:
lang_code = detect(text)
lang = LANG_CODE_MAP.get(lang_code, 'english')
return LANGUAGE_STOPWORDS.get(lang, LANGUAGE_STOPWORDS['english'])
except langdetect.LangDetectException:
return LANGUAGE_STOPWORDS['english']
# 初始化模型和分词器
MODEL = "sohan-ai/sentiment-analysis-model-amazon-reviews"
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertForSequenceClassification.from_pretrained(MODEL)
# 全局变量
current_bigram_samples = []
FULL_BIGRAM_DF = pd.DataFrame() # 存储完整的bigram数据
last_selected_reviews = [] # 存放最后一次选中的评论列表
translator = Translator() # 初始化翻译器
def filter_bigrams(search_text):
"""过滤关键词组"""
global FULL_BIGRAM_DF
if not search_text.strip():
return FULL_BIGRAM_DF
# 不区分大小写的搜索
mask = FULL_BIGRAM_DF["词组"].str.contains(search_text, case=False, na=False)
return FULL_BIGRAM_DF[mask]
def analyze_text(text):
"""分析单个文本的情感"""
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
outputs = model(**inputs)
scores = torch.nn.functional.softmax(outputs.logits, dim=1)
scores = scores.detach().numpy()[0]
return {
"积极情感概率": float(scores[1]),
"消极情感概率": float(scores[0]),
"整体情感": "积极" if scores[1] > scores[0] else "消极"
}
def preprocess_text(text):
"""预处理文本"""
# 转换为小写
text = text.lower()
# 去除特殊字符,只保留字母和空格
text = re.sub(r'[^a-z\s]', ' ', text)
# 去除多余空格
text = re.sub(r'\s+', ' ', text).strip()
return text
def extract_bigrams(texts, min_freq=2, max_freq_ratio=0.9):
"""提取关键词组(两个单词)"""
# 预处理所有文本
processed_texts = [preprocess_text(text) for text in texts]
# 提取所有双词组及其对应的文本
all_bigrams = []
bigram_texts = {} # 存储词组对应的原始文本
for idx, (text, processed) in enumerate(zip(texts, processed_texts)):
words = processed.split()
text_bigrams = list(ngrams(words, 2))
text_bigram_strs = [' '.join(bigram) for bigram in text_bigrams]
all_bigrams.extend(text_bigram_strs)
# 记录每个词组对应的原始文本
for bigram in text_bigram_strs:
if bigram not in bigram_texts:
bigram_texts[bigram] = []
bigram_texts[bigram].append(text)
# 计算词组频率
bigram_freq = Counter(all_bigrams)
total_docs = len(texts) # 总评论数
# 过滤词组
filtered_bigrams = {
bigram: freq for bigram, freq in bigram_freq.items()
if min_freq <= freq <= total_docs * max_freq_ratio # 保留在频率范围内的词组
}
# 创建词组统计DataFrame
bigram_stats = []
# 准备Dataset数据
dataset_samples = []
for bigram, freq in sorted(filtered_bigrams.items(), key=lambda x: x[1], reverse=True):
# 计算占总评论数的百分比
percentage = freq / total_docs * 100
# 获取该词组对应的所有文本
related_texts = bigram_texts[bigram]
# 统计DataFrame数据
bigram_stats.append({
"词组": bigram,
"出现次数": freq,
"占比": f"{percentage:.2f}%" # 占总评论数的百分比
})
# Dataset数据
formatted_texts = "\n\n".join(f"{i+1}. {text}" for i, text in enumerate(related_texts))
dataset_samples.append([bigram, [formatted_texts]])
return pd.DataFrame(bigram_stats), dataset_samples
def perform_lda_analysis(texts, n_topics=15):
"""执行LDA主题分析"""
# 获取动态停用词
stop_words = list(get_stopwords(' '.join(texts)))
# 创建TF-IDF向量化器
vectorizer = TfidfVectorizer(
max_df=0.9, # 忽略在90%以上文档中出现的词
min_df=2, # 忽略在少于2个文档中出现的词
stop_words=stop_words, # 使用动态停用词
ngram_range=(2, 2) # 使用双词组(bigrams)
)
# 预处理文本
processed_texts = [preprocess_text(text) for text in texts]
# 转换文本数据
try:
tfidf = vectorizer.fit_transform(processed_texts)
# 创建并训练LDA模型
lda_model = LDA(
n_components=n_topics,
random_state=0
)
lda_output = lda_model.fit_transform(tfidf)
# 获取特征词
feature_names = vectorizer.get_feature_names_out()
# 整理主题词
topics = []
for topic_idx, topic in enumerate(lda_model.components_):
top_words_idx = topic.argsort()[:-15:-1] # 获取前15个词组
top_words = [feature_names[i] for i in top_words_idx]
topics.append({
"主题": f"主题 {topic_idx + 1}",
"关键词": ", ".join(top_words)
})
# 获取每个文档的主题分布
doc_topics = []
for doc_idx, doc_topics_dist in enumerate(lda_output):
dominant_topic = doc_topics_dist.argmax()
doc_topics.append({
"文本": texts[doc_idx], # 显示完整文本
"主导主题": f"主题 {dominant_topic + 1}",
"主题概率": f"{doc_topics_dist[dominant_topic]:.2%}"
})
return pd.DataFrame(topics), pd.DataFrame(doc_topics)
except ValueError as e:
# 如果没有足够的词组进行分析,返回空的DataFrame
empty_topics = pd.DataFrame(columns=["主题", "关键词"])
empty_docs = pd.DataFrame(columns=["文本", "主导主题", "主题概率"])
return empty_topics, empty_docs
def create_pie_chart(positive_count, negative_count):
"""创建情感分布饼图"""
fig = go.Figure(data=[go.Pie(
labels=['积极评价', '消极评价'],
values=[positive_count, negative_count],
hole=.3,
marker_colors=['#2ecc71', '#e74c3c']
)])
fig.update_layout(
title="情感分布",
showlegend=True,
width=400,
height=400
)
return fig
def create_score_histogram(df):
"""创建情感得分直方图"""
fig = go.Figure()
fig.add_trace(go.Histogram(
x=df["积极情感概率"],
name="积极情感",
nbinsx=20,
marker_color='#2ecc71'
))
fig.add_trace(go.Histogram(
x=df["消极情感概率"],
name="消极情感",
nbinsx=20,
marker_color='#e74c3c'
))
fig.update_layout(
title="情感得分分布",
xaxis_title="情感得分",
yaxis_title="评论数量",
barmode='overlay',
width=600,
height=400
)
return fig
def analyze_file(file, progress=gr.Progress()):
"""分析文件中的多个文本"""
global current_bigram_samples, FULL_BIGRAM_DF
results = []
try:
# 读取文件内容
if file is None:
return "请上传文件", None, None, None, None, None, None, None, None, "", None
# 读取上传的文件内容
text_content = file.name
with open(text_content, 'r', encoding='utf-8') as f:
content = f.readlines()
progress(0, desc="正在预处理文本...")
# 处理每一行评论
texts = [] # 存储所有文本用于LDA分析
total_lines = len([line for line in content if line.strip()])
# 检测语言
all_text = ' '.join([line.strip() for line in content if line.strip()])
try:
lang_code = detect(all_text)
detected_lang = LANG_CODE_MAP.get(lang_code, 'english')
lang_info = f"检测到语言:{detected_lang},将使用对应的停用词列表"
except:
detected_lang = 'english'
lang_info = "语言检测失败,将使用英语停用词列表"
progress(0.1, desc="正在进行情感分析...")
for i, line in enumerate(content):
if line.strip():
result = analyze_text(line.strip())
results.append({
"文本": line.strip(),
**result
})
texts.append(line.strip())
progress((i + 1) / total_lines * 0.3) # 情感分析占30%进度
# 创建DataFrame
df = pd.DataFrame(results)
# 生成统计信息
total = len(df)
if total == 0:
return "没有找到有效的评论文本", None, None, None, None, None, None, None, None, "", None
positive = len(df[df["整体情感"] == "积极"])
negative = len(df[df["整体情感"] == "消极"])
# 生成分析统计信息
analysis_info = (
f"{lang_info}\n"
f"分析完成!共分析{total}条文本\n"
f"积极:{positive}条 ({positive/total*100:.1f}%)\n"
f"消极:{negative}条 ({negative/total*100:.1f}%)"
)
progress(0.4, desc="正在生成词云...")
# 生成词云
positive_text = " ".join(df[df["整体情感"] == "积极"]["文本"])
negative_text = " ".join(df[df["整体情感"] == "消极"]["文本"])
pos_wordcloud = None
neg_wordcloud = None
if positive_text:
pos_wordcloud = WordCloud(width=400, height=200, background_color='white').generate(positive_text)
pos_wordcloud = pos_wordcloud.to_image()
if negative_text:
neg_wordcloud = WordCloud(width=400, height=200, background_color='white').generate(negative_text)
neg_wordcloud = neg_wordcloud.to_image()
progress(0.5, desc="正在生成可视化图表...")
# 创建可视化图表
pie_chart = create_pie_chart(positive, negative)
score_hist = create_score_histogram(df)
progress(0.6, desc="正在提取关键词组...")
# 提取关键词组
bigrams_df, bigram_samples = extract_bigrams(texts)
current_bigram_samples = bigram_samples # 更新全局变量
FULL_BIGRAM_DF = bigrams_df.copy() # 保存完整的bigram数据
progress(0.7, desc="正在进行主题分析...")
# 执行LDA主题分析
topics_df, doc_topics_df = perform_lda_analysis(texts)
progress(0.9, desc="正在保存结果...")
# 准备显示用的DataFrame
display_df = df.copy()
display_df["积极情感概率"] = display_df["积极情感概率"].apply(lambda x: f"{x:.2%}")
display_df["消极情感概率"] = display_df["消极情感概率"].apply(lambda x: f"{x:.2%}")
# 保存结果到Excel文件,包含多个sheet
excel_path = "sentiment_analysis_results.xlsx"
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
# 保存情感分析结果
df.to_excel(writer, sheet_name='情感分析结果', index=False)
# 保存LDA主题关键词
topics_df.to_excel(writer, sheet_name='主题关键词', index=False)
# 保存文档主题分布
doc_topics_df.to_excel(writer, sheet_name='文档主题分布', index=False)
# 保存关键词组统计
bigrams_df.to_excel(writer, sheet_name='关键词组统计', index=False)
progress(1.0, desc="分析完成!")
return (
analysis_info,
pos_wordcloud,
neg_wordcloud,
display_df,
pie_chart,
score_hist,
topics_df,
doc_topics_df,
bigrams_df,
'<div style="color: #666; padding: 10px;">请点击左侧词组查看相关评论</div>', # 初始HTML提示
excel_path
)
except Exception as e:
import traceback
error_msg = f"处理文件时出错:{str(e)}\n{traceback.format_exc()}"
return error_msg, None, None, None, None, None, None, None, None, "", None
def single_text_interface(text):
"""单文本分析界面的处理函数"""
if not text.strip():
return "请输入要分析的文本"
result = analyze_text(text)
return (
f"积极情感概率:{result['积极情感概率']:.2%}\n"
f"消极情感概率:{result['消极情感概率']:.2%}\n"
f"整体情感:{result['整体情感']}"
)
def highlight_keyword(text, keyword):
"""用 <mark> 给 keyword 做简单的大小写不敏感高亮"""
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
return pattern.sub(r'<mark style="background-color: #ffd700; padding: 0 2px; border-radius: 2px;">\g<0></mark>', text)
def show_bigram_reviews(evt: gr.SelectData, df):
"""显示选中词组的相关评论"""
global current_bigram_samples, last_selected_reviews
selected_bigram = df.iloc[evt.index[0]]["词组"] # 获取选中行的词组
# 清空上一次的评论列表
last_selected_reviews = []
for sample in current_bigram_samples:
if sample[0] == selected_bigram:
# 将评论转换为HTML格式
reviews = sample[1][0].split("\n\n")
highlighted_reviews = []
for i, review in enumerate(reviews, start=1):
# 保存原文评论(含序号)到全局变量
last_selected_reviews.append(review)
# 提取评论内容(去除序号前缀)
review_content = review.split(". ", 1)[1] if ". " in review else review
# 高亮关键词
highlighted_review = highlight_keyword(review_content, selected_bigram)
# 添加序号和样式
highlighted_reviews.append(
f'<div style="margin-bottom: 10px; padding: 10px; background-color: #f5f5f5; border-radius: 5px;">'
f'<span style="font-weight: bold; color: #666;">#{i}</span> {highlighted_review}'
f'</div>'
)
# 拼接成完整的HTML
html_content = (
'<div style="max-height: 500px; overflow-y: auto; padding: 10px;">'
f'<div style="margin-bottom: 10px; color: #333;">找到 {len(reviews)} 条包含 "<b>{selected_bigram}</b>" 的评论:</div>'
f'{"".join(highlighted_reviews)}'
'</div>'
)
return html_content
return '<div style="color: #666; padding: 10px;">未找到相关评论</div>'
def translate_single_comment(comment_index):
"""翻译单条评论"""
global last_selected_reviews
if not last_selected_reviews:
return "请先选择一个词组查看相关评论。"
try:
comment_index = int(comment_index)
except:
return "请输入有效的评论序号(数字)"
if comment_index < 1 or comment_index > len(last_selected_reviews):
return f"评论序号超出范围!可选范围: 1~{len(last_selected_reviews)}"
# 获取原文并去除序号前缀
original_text = last_selected_reviews[comment_index - 1]
parts = original_text.split(". ", 1)
if len(parts) == 2:
original_text = parts[1]
else:
original_text = parts[0]
try:
# 创建异步事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def translate_async():
async with Translator() as translator:
result = await translator.translate(original_text, dest='zh-cn')
return result
# 运行异步翻译
result = loop.run_until_complete(translate_async())
loop.close()
return f"原文:\n{original_text}\n\n中文翻译:\n{result.text}"
except Exception as e:
# 如果是网络错误,提示用户
if "HTTPSConnectionPool" in str(e):
return "网络连接错误,请检查网络连接并重试"
return f"翻译出错: {str(e)}"
# 创建Gradio界面
with gr.Blocks(title="亚马逊评论文本情感分析系统", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 亚马逊评论文本情感分析系统")
with gr.Tabs():
with gr.TabItem("单文本分析"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="输入文本",
lines=3,
placeholder="请输入要分析的文本...",
value=""
)
analyze_btn = gr.Button("分析", variant="primary")
with gr.Column():
text_output = gr.Textbox(label="分析结果", lines=3)
analyze_btn.click(
single_text_interface,
inputs=[text_input],
outputs=[text_output]
)
with gr.TabItem("批量文件分析"):
with gr.Row():
file_input = gr.File(
label="上传文本文件(UTF-8编码的txt文件,每行一条评论)",
file_types=[".txt"]
)
analyze_file_btn = gr.Button("开始分析", variant="primary")
with gr.Row():
file_output = gr.Textbox(label="分析统计", lines=4)
with gr.Row():
with gr.Column():
gr.Markdown("### 评论情感分布")
pie_chart = gr.Plot()
with gr.Column():
gr.Markdown("### 情感得分分布")
score_hist = gr.Plot()
with gr.Row():
with gr.Column():
gr.Markdown("### 积极评论词云")
pos_wordcloud = gr.Image()
with gr.Column():
gr.Markdown("### 消极评论词云")
neg_wordcloud = gr.Image()
gr.Markdown("### 关键词组统计")
with gr.Row():
with gr.Column(scale=1):
# 添加搜索框
search_box = gr.Textbox(
label="搜索关键词组",
placeholder="输入关键词以过滤词组...",
show_label=True
)
bigrams_df = gr.Dataframe(
headers=["词组", "出现次数", "占比"],
datatype=["str", "number", "str"],
wrap=True,
interactive=True
)
# 添加搜索事件
search_box.change(
fn=filter_bigrams,
inputs=[search_box],
outputs=[bigrams_df]
)
with gr.Column(scale=1):
gr.Markdown("#### 选中词组的相关评论")
bigram_reviews = gr.HTML()
# 添加翻译功能组件
with gr.Row():
comment_index = gr.Number(
label="要翻译的评论序号",
value=1,
precision=0
)
translate_btn = gr.Button("翻译")
translate_output = gr.Textbox(
label="翻译结果",
lines=6
)
# 添加词组选择事件
bigrams_df.select(
fn=show_bigram_reviews,
inputs=[bigrams_df],
outputs=bigram_reviews
)
# 添加翻译按钮事件
translate_btn.click(
fn=translate_single_comment,
inputs=[comment_index],
outputs=[translate_output]
)
gr.Markdown("### 主题分析结果")
with gr.Row():
with gr.Column():
gr.Markdown("#### 主题关键词(越靠前,主题越重要,提到次数越多)")
topics_df = gr.Dataframe(
headers=["主题", "关键词"],
datatype=["str", "str"],
wrap=True
)
with gr.Column():
gr.Markdown("#### 文档-主题分布")
doc_topics_df = gr.Dataframe(
headers=["文本", "主导主题", "主题概率"],
datatype=["str", "str", "str"],
wrap=True
)
gr.Markdown("### 详细分析结果")
results_df = gr.Dataframe(
headers=["文本", "积极情感概率", "消极情感概率", "整体情感"],
datatype=["str", "str", "str", "str"],
wrap=True
)
file_download = gr.File(label="下载完整分析结果(Excel)")
analyze_file_btn.click(
analyze_file,
inputs=[file_input],
outputs=[file_output, pos_wordcloud, neg_wordcloud, results_df, pie_chart, score_hist, topics_df, doc_topics_df, bigrams_df, bigram_reviews, file_download]
)
if __name__ == "__main__":
demo.launch(share=False)