Spaces:
Build error
Build error
import base64 | |
import io | |
import os | |
import sys | |
import traceback | |
import gradio as gr | |
import requests | |
from PIL import Image | |
import time | |
from langchain.agents import AgentExecutor, create_react_agent | |
from langchain.agents import Tool | |
from langchain.schema import ( | |
HumanMessage, | |
) | |
from langchain.tools import BaseTool | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_openai import ChatOpenAI | |
from serpapi import GoogleSearch | |
import re | |
from model import * | |
from gradio import ChatMessage, on | |
from config import * | |
from tools import * | |
from prompt import * | |
import sys | |
from gradio.components.chatbot import MessageDict | |
from langchain_community.llms import Tongyi | |
os.environ["OPENAI_API_KEY"] = "sb-6a683cb3bd63a9b72040aa2dd08feff8b68f08a0e1d959f5" | |
os.environ['OPENAI_BASE_URL'] = "https://api.openai-sb.com/v1/" | |
os.environ["SERPAPI_API_KEY"] = "dcc98b22d5f7d413979a175ff7d75b721c5992a3ee1e2363020b2bbdf4f82404" | |
os.environ['TAVILY_API_KEY'] = "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" # you | |
os.environ["REPLICATE_API_TOKEN"] = "r8_IYJpjwjrxegcUfBeBbyUxErJXXsnHDM4AlSQQ" | |
os.environ["DASHSCOPE_API_KEY"] = "sk-8159f0ed38994c3b96b4527404ea1cda" | |
# client = OpenAI( | |
# api_key="EMPTY", # 本地服务不需要 API 密钥 | |
# base_url="http://localhost:8005/v1", # 本地服务的 URL | |
# ) | |
# # 创建 LangChain 的 LLM 实例 | |
# llm = LangChainOpenAI( | |
# openai_api_key="EMPTY", # 本地服务不需要 API 密钥 | |
# openai_api_base="http://localhost:8005/v1", # 本地服务的 URL | |
# model_name="llama3", # 模型名称 | |
# temperature=0.1, # 控制生成文本的随机性 | |
# ) | |
# llm = ChatOpenAI(model="gpt-4o", temperature=0.1) | |
llm = Tongyi(model_name="qwen-plus", temperature=0.1) | |
dashscope.api_key = "sk-8159f0ed38994c3b96b4527404ea1cda" | |
def img_size(image): | |
width, height = image.size | |
while width >= 500 or height >= 400: | |
width = width * 0.8 | |
height = height * 0.8 | |
width = int(width) | |
height = int(height) | |
resized_img = image.resize((width, height)) | |
return resized_img | |
def encode_image(image): | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
def image_summarize(img_base64, prompt): | |
# chat = ChatOpenAI(model="gpt-4o", max_tokens=256) | |
client = OpenAI( | |
api_key=os.getenv('DASHSCOPE_API_KEY'), | |
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", | |
) | |
completion = client.chat.completions.create( | |
model="qwen-vl-plus", | |
messages=[ | |
{ | |
"role": "system", | |
"content": [{"type": "text", "text": f"{prompt}"}]}, | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:image/png;base64,{img_base64}"}, | |
}, | |
{"type": "text", "text": "请分析该图片"}, | |
], | |
} | |
], | |
) | |
return completion.choices[0].message.content.replace('*','').replace('\n', ' ').strip() | |
def generate_img_summaries(image): | |
# img_size(path) | |
image_summaries = [] | |
prompt = """You are an assistant responsible for compiling images for retrieval\ | |
These abstracts will be embedded and used to retrieve the original images\ | |
Provide a detailed summary of the optimized images for retrieval.""" | |
base64_image = encode_image(image) | |
image_summaries.append(image_summarize(base64_image, prompt)) | |
return image_summaries | |
def SelectLanguage(option): | |
global selected_language | |
if option == "英文": | |
selected_language = "en" | |
else: | |
selected_language = "ch" | |
def SelectModel(option): | |
global selected_model | |
if option == "自研": | |
selected_model = "gpt" | |
elif option == "llama3-70b": | |
selected_model = "llama3-70b" | |
elif option == "llama3-8b": | |
selected_model = "llama3-8b" | |
else: | |
selected_model = "mistral" | |
def SelectConstract(option): | |
global selected_constract | |
if option == "qwen": | |
selected_constract = "qwen" | |
elif option == "llama": | |
selected_constract = "llama" | |
elif option == "glm": | |
selected_constract = "glm" | |
elif option == "doubao": | |
selected_constract = "doubao" | |
elif option == "deepseek": | |
selected_constract = "deepseek" | |
else: | |
selected_constract = "baichuan" | |
image_summaries = "" | |
flag = 8 | |
def update_image_summaries(): | |
global image_summaries | |
return image_summaries | |
# def update_flag(): | |
# global flag | |
# if flag == 1: | |
# img_path = "../flag_image/1.png" | |
# elif flag == 2: | |
# img_path = "../flag_image/2.png" | |
# elif flag == 3: | |
# img_path = "../flag_image/3.png" | |
# elif flag == 4: | |
# img_path = "../flag_image/4.png" | |
# elif flag == 5: | |
# img_path = "../flag_image/5.png" | |
# elif flag == 6: | |
# img_path = "../flag_image/6.png" | |
# elif flag == 7: | |
# img_path = "../flag_image/7.png" | |
# else: | |
# img_path = "../flag_image/8.png" | |
# img = Image.open(img_path) | |
# return img | |
def img_is_modify(image): | |
if image is None: | |
return "" | |
image_summaries = [] | |
prompt = """你是一个负责分析图像的助手,任务是判断图像中展示的内容是否真实,并与现实世界中可能发生的情况一致。\ | |
你的目标是识别图像中可能包含的不现实元素,例如:\ | |
1. 不自然的光线或阴影,与环境不匹配。\ | |
2. 纹理或反射不一致,与场景不对齐。\ | |
3. 出现位置不合适或物理上不可能存在的物体或人物。\ | |
4. 不寻常的比例、角度或视角,这在自然环境中不太可能出现。\ | |
5. 任何其他视觉线索,暗示图像呈现的是不现实或不可能的场景。\ | |
请提供图像的详细总结,说明内容是否真实可信,或者是否存在不现实或不可能的迹象。 | |
""" | |
# You should respond in Chinese | |
base64_image = encode_image(image) | |
image_summaries.append(image_summarize(base64_image, prompt)) | |
summary_text = image_summaries[0] | |
# 去除换行符 | |
cleaned_text = summary_text.replace('\n', ' ').strip() | |
# prompt_2 = "" | |
return cleaned_text | |
def DailyNews(): | |
url = "https://v3.alapi.cn/api/new/wbtop" | |
payload = { | |
"token": "jwgkxqrzqsdi6hzxzwbqngtypomthu", | |
"num": "10" | |
} | |
headers = {"Content-Type": "application/json"} | |
response = requests.post(url, json=payload, headers=headers).json() | |
result = [] | |
for i in range(5): | |
result.append({ | |
"index": i + 1, | |
"title": response['data'][i]['hot_word'], | |
"url": response['data'][i]['url'] | |
}) | |
container_html = '<div class="new-container2">' | |
for news in result: | |
card_html = f""" | |
<div class="title2"> | |
<span>{str(news['index']) + "、 " + news['title']}</span> | |
<a href="{news['url']}" target="_blank">跳转</a> | |
</div> | |
""" | |
container_html += card_html | |
container_html += '</div>' | |
return gr.update(value=container_html) | |
css = """ | |
.news-container { | |
display: flex; /* 使用 flex 布局 */ | |
flex-wrap: wrap; /* 如果内容过多,允许换行 */ | |
gap: 10px; /* 卡片之间的间距 */ | |
justify-content: flex-start; /* 卡片从左到右排列 */ | |
} | |
.news-card { | |
border: 1px solid #ccc; /* 边框 */ | |
border-radius: 5px; /* 圆角 */ | |
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); /* 阴影 */ | |
padding: 10px; /* 内边距 */ | |
margin: 10px 0; /* 外边距 */ | |
width: 100%; /* 卡片宽度适应父容器 */ | |
max-width: 150px; /* 最大宽度为300px */ | |
max-height: 100px; | |
overflow: hidden; /* 确保内容不会溢出卡片 */ | |
} | |
.news-card img { | |
display: block; /* 将图片设为块级元素,避免底部空隙 */ | |
width: 20px; /* 固定宽度为 60px */ | |
height: 20px; /* 固定高度为 60px */ | |
border-radius: 5px; /* 图片圆角 */ | |
object-fit: cover; /* 裁剪图片以适应容器 */ | |
margin-bottom: 5px; /* 图片下方间距 */ | |
} | |
.news-card .title { | |
font-weight: bold; /* 标题加粗 */ | |
font-size: 10px; /* 标题字体大小 */ | |
} | |
.news-card .source { | |
font-size: 10px; | |
font-style: italic !important; /* 来源文字斜体 */ | |
color: #666 !important; /* 来源文字颜色 */ | |
} | |
.new-container2{ | |
width: 200px; /* 容器宽度 */ | |
height: 200px; /* 容器高度 */ | |
overflow-y: auto; /* 超出容器的内容显示滚动条 */ | |
border: 1px solid #ccc; /* 给容器添加边框 */ | |
padding: 10px; | |
box-sizing: border-box; /* 使得 padding 和 border 包含在容器大小内 */ | |
} | |
.title2 { | |
margin-bottom: 10px; /* 每条新闻之间的间距 */ | |
font-weight: bold; | |
font-size: 14px; | |
line-height: 1.6; | |
} | |
.title2 a { | |
text-decoration: none; | |
color: #007bff; | |
} | |
.title2 a:hover { | |
text-decoration: underline; | |
} | |
""" | |
def format_reply(reply): | |
formatted_text = "" | |
news_list = reply.get("news", []) | |
container_html = '<div class="news-container">' # 添加容器 | |
for news in news_list[:5]: | |
card_html = f""" | |
<div class="news-card"> | |
<img src="{news['image']}" alt="新闻图片"> | |
<div class="title"><a href="{news['url']}" target="_blank">{news['title']}</a></div> | |
<div class="source">{news['source']}</div> | |
</div> | |
""" | |
container_html += card_html | |
container_html += '</div>' # 关闭容器 | |
formatted_text += container_html | |
formatted_text += reply.get("text", "") | |
return formatted_text | |
def constract(): | |
global selected_constract | |
global query | |
tool = BoChaSearchTool() | |
web_information = tool._run(query) | |
information = [] | |
for step in web_information['data']['webPages']['value']: | |
information.append(step['snippet']) | |
query = query + "搜索到的相关新闻:" + str(information) | |
if selected_constract == "qwen": | |
result = qwen(query) | |
elif selected_constract == "llama": | |
result = llama(query) | |
elif selected_constract == "glm": | |
result = glm(query) | |
elif selected_constract == "doubao": | |
result = doubao(query) | |
elif selected_constract == "deepseek": | |
result = deepseek(query) | |
else: | |
result = baichuan(query) | |
return result | |
def SelectTheme(theme): | |
return gr.Chatbot(layout=theme, type="messages") | |
def generate_chat_title(conversation: list[MessageDict]) -> str: | |
title = "" | |
for message in conversation: | |
if message["role"] == "user": | |
if isinstance(message["content"], str): | |
title += message["content"] | |
break | |
else: | |
title += "📎 " | |
if len(title) > 40: | |
title = title[:40] + "..." | |
# print(title) | |
return title or "Conversation" | |
def load_chat_history(conversations): | |
# print(conversations) | |
return gr.Dataset( | |
samples=[ | |
[generate_chat_title(conv)] | |
for conv in conversations or [] | |
if conv | |
] | |
) | |
def dispaly_state(state): | |
print(state) | |
def save_conversation( | |
index: int | None, | |
conversation: list[MessageDict], | |
saved_conversations: list[list[MessageDict]], | |
): | |
if index is not None: | |
saved_conversations[index] = conversation | |
else: | |
saved_conversations.append(conversation) | |
index = len(saved_conversations) - 1 | |
return index, saved_conversations | |
def load_conversation( | |
index: int, | |
conversations: list[list[MessageDict]], | |
): | |
return ( | |
index, | |
gr.Chatbot( | |
value=conversations[index], # type: ignore | |
feedback_value=[], | |
type="messages" | |
), | |
) | |
def ModelPrompt(prompt): | |
global constract_model_prompt | |
constract_model_prompt = prompt | |
def react(dict, space): | |
global image_summaries | |
global flag | |
global query | |
topic = dict['text'] | |
if dict['files'] == []: # 没有图片 | |
image = None | |
else: | |
image_path = dict['files'][0] | |
image = Image.open(image_path) | |
image = img_size(image) # 调整图像尺寸 | |
image_summaries = img_is_modify(image) # 获得图像信息 | |
if topic == "": | |
result = img_is_modify(image) # 判断是否修改 | |
return result | |
query = topic | |
tools = [BoChaSearchTool(), TavilySearchResults(max_result=1), | |
ImageSearchTool(), WeatherCrossing(), GetHoliday(), GetLocation(), | |
CurrencyConversion(), SafeCodeExecutor(), SafeExpressionEvaluator(), | |
RegionInquiryTool(), HTMLTextExtractor()] | |
if selected_language == "en": | |
prompt = ChatPromptTemplate.from_template(en_prompt) | |
else: | |
prompt = ChatPromptTemplate.from_template(ch_prompt) | |
agent = create_react_agent(llm, tools, prompt) | |
captured_output = io.StringIO() | |
# 将 sys.stdout 重定向到 StringIO 对象 | |
sys.stdout = captured_output | |
cur_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | |
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True, | |
return_intermediate_steps=True, include_run_info=True) | |
response = agent_executor.invoke( | |
{"current_time": cur_time, | |
"input": topic, | |
"image_information": image_summaries}) | |
sys.stdout = sys.__stdout__ | |
captured_content = str(captured_output.getvalue()) | |
captured_content = re.sub(r'\x1b\[[0-9;]*m', '', captured_content) | |
print("captured_content begin", captured_content, "\ncaptured_content end") | |
explain = "" | |
try: | |
match = re.search(r'\{.*\}(.*)', captured_content, re.DOTALL) | |
if match: | |
extracted_content = match.group(1).strip() | |
match2 = re.search( | |
r'Summary:(.*)Final Answer:', | |
extracted_content, | |
re.DOTALL) | |
if match2: | |
extracted_content2 = match2.group(1).strip() | |
explain = extracted_content2 | |
except Exception as e: | |
error_message = traceback.format_exc() | |
print(error_message) | |
text = "<b>思考结果:</b>\n" + re.sub(r'<.*?>', '', explain) + "\n\n<b>鉴定结果:</b>\n" + re.sub(r'<.*?>', '', response['output'].replace('*', '').strip()) | |
text = text.replace("\n", "<br>") | |
reply = { | |
'text': text, | |
'news': [] | |
} | |
# if selected_language == 'ch': | |
# if "完全不正确" in response['output']: | |
# flag = 1 | |
# elif "大部分不正确" in response['output']: | |
# flag = 2 | |
# elif "真假参半" in response['output']: | |
# flag = 3 | |
# elif "大部分正确" in response['output']: | |
# flag = 4 | |
# elif "完全正确" in response['output']: | |
# flag = 5 | |
# elif "可能错误" in response['output']: | |
# flag = 6 | |
# elif "可能正确" in response['output']: | |
# flag = 7 | |
# else: | |
# flag = 8 | |
# else: | |
# if "Completely_False" in response['output']: | |
# flag = 1 | |
# elif "Mostly_False" in response['output']: | |
# flag = 2 | |
# elif "Mixed" in response['output']: | |
# flag = 3 | |
# elif "Mostly_True" in response['output']: | |
# flag = 4 | |
# elif "Completely_True" in response['output']: | |
# flag = 5 | |
# elif "Likely_False" in response['output']: | |
# flag = 6 | |
# elif "Likely_True" in response['output']: | |
# flag = 7 | |
# else: | |
# flag = 8 | |
agent_thought = [] | |
pattern1 = r"Pre Thought:(.*?)Thought: (.*?)\nAction: (.*?)\nAction Input: (.*)" | |
pattern2 = r"Thought: (.*?)\nAction: (.*?)\nAction Input: (.*)" | |
for index in response['intermediate_steps']: | |
match = re.search(pattern1, index[0].log, re.S) | |
before_thought = match.group(1).strip() if match else "" | |
thought = match.group(2).strip() if match else "" | |
action = match.group(3).strip() if match else "" | |
action_input = match.group(4).strip() if match else "" | |
if not match: | |
match = re.search(pattern2, index[0].log, re.S) | |
before_thought = match.group(1).strip() if match else "" | |
thought = match.group(2).strip() if match else "" | |
action_input = match.group(3).strip() if match else "" | |
if len(before_thought) >= 3: | |
agent_thought.append( | |
ChatMessage( | |
role="assistant", | |
content=before_thought, | |
metadata={"title": "预思考:"} | |
) | |
) | |
if len(action_input) >= 3: | |
agent_thought.append( | |
ChatMessage( | |
role="assistant", | |
content=action_input, | |
metadata={"title": f"工具调用:{action}"} | |
) | |
) | |
if response['intermediate_steps'][0][0].tool == "tavily_search_results_json": | |
i = 0 | |
for step1 in response['intermediate_steps']: | |
for step2 in step1[1]: | |
reply['news'].append({ | |
"title": step2['content'][:28] + "...", | |
"url": step2['url'], | |
"source": "", | |
"image": "" | |
}) | |
i = i + 1 | |
if i == 3: | |
break | |
break | |
if response['intermediate_steps'][0][0].tool == "BoCha Webs Search": | |
for step1 in response['intermediate_steps']: | |
try: | |
for i in range(3): | |
reply['news'].append({ | |
"title": step1[1]['data']['webPages']['value'][i]['snippet'][:28] + "...", | |
"url": step1[1]['data']['webPages']['value'][i]['url'], | |
"source": step1[1]['data']['webPages']['value'][i]['siteName'], | |
"image": step1[1]['data']['images']['value'][i]['contentUrl'] | |
}) | |
except BaseException: | |
continue | |
if response['intermediate_steps'][0][0].tool == "Baidu News Search": | |
for step1 in response['intermediate_steps']: | |
try: | |
for i in range(3): | |
reply['news'].append({ | |
"title": step1[1][i]['title'][:28] + "...", | |
"url": step1[1][i]['link'], | |
"source": step1[1][i]['source'], | |
"image": "" | |
}) | |
except BaseException: | |
continue | |
formatted_reply = format_reply(reply) | |
# print(formatted_reply) | |
return agent_thought[:2] + [formatted_reply] | |
with gr.Blocks(css=css, theme='soft') as demo: | |
gr.HTML("<h1 style='font-size: 36px; text-align: center; color: #333333; margin-bottom: 20px;'>虚假信息检测系统</h1>") | |
with gr.Tab(label='Chat'): | |
with gr.Row(): | |
with gr.Sidebar(): | |
with gr.Column(scale=1): | |
gr.Textbox(visible=False) | |
with gr.Column(scale=1): | |
gr.Textbox(visible=False) | |
with gr.Column(scale=1): | |
new_chat_button = gr.Button( | |
"New chat", | |
variant="primary", | |
size="md", | |
icon="plus.svg", | |
) | |
chat_history_dataset = gr.Dataset( | |
components=[gr.Textbox(visible=False)], | |
show_label=False, | |
layout="table", | |
type="index", | |
) | |
with gr.Accordion("展示每日热搜", open=False): | |
daily_news = gr.HTML(label="每日热搜", show_label=True, container=True) | |
language_select = gr.Dropdown(["中文", "英文"], label="请选择要使用的语言", scale=1, value="中文") | |
model_select = gr.Dropdown(["自研", "llama3-70b", "llama3-8b", "mistral"], | |
label="请选择要使用的大模型", | |
scale=1, value="自研") | |
# flag = gr.Image(label="新闻标签", type="numpy", visible=False) | |
theme_select = gr.Dropdown(["气泡", "面板"], label="切换聊天样式", value="气泡") | |
daily_bn = gr.Button("查看每日热搜") | |
with gr.Column(scale=3): | |
bot = gr.ChatInterface( | |
fn=react, | |
examples=[ | |
{"text": "9月19日,马来西亚最高元首 Ibrahim 应邀对中国进行为期8天国事访问,亦是2024年1月上任以来首次访问东盟外国家。"}, | |
{"text": "据最新天文研究,火星的轨道将逐渐接近地球,最终成为地球的“第二月亮”。天文学家预测这一变化将在2025年发生,届时火星将在夜空中与月亮一样明亮,影响全球潮汐和生态平衡。"} | |
], | |
chatbot=gr.Chatbot(label='自研系统', | |
avatar_images=("/image/human.png", "/image/bot.png"), | |
type="messages", | |
height = 600, | |
layout="bubble", | |
show_copy_button=True, | |
show_copy_all_button=True, | |
), | |
multimodal=True, | |
show_progress='full', | |
type="messages", | |
flagging_mode='manual', | |
cache_examples = False, | |
example_icons=["/image/search.png", "/image/search.png"] | |
) | |
with gr.Column(scale=1): | |
with gr.Accordion("图片检测", open=False): | |
img_info = gr.Textbox(label="提取到的信息", lines=5) | |
# contrast_model = gr.Textbox(label="对比模型", lines=5) | |
# with gr.Accordion("展示对比模型prompt", open=False): | |
# model_prompt = gr.Textbox(label="对比模型prompt", lines=5, placeholder=constract_model_prompt, interactive=True) | |
# constract_select = gr.Dropdown(["qwen", "llama", "glm", "doubao", "deepseek", "baichuan"], | |
# label="请选择使用的对比模型", scale=1, value="qwen") | |
# constract_bn = gr.Button("展示对比模型") | |
# prompt_bn = gr.Button("确定更改提示词") | |
img_bn = gr.Button("显示检测结果") | |
img_bn.click(update_image_summaries, [], img_info) | |
# flag_bn.click(update_flag, [], flag) | |
daily_bn.click(DailyNews, [], [daily_news]) | |
language_select.change(SelectLanguage, language_select, []) | |
model_select.change(SelectModel, model_select, []) | |
theme_select.change(SelectTheme, theme_select, bot.chatbot) | |
# constract_select.change(SelectConstract, constract_select, []) | |
# constract_bn.click(constract, [], contrast_model) | |
# prompt_bn.click(ModelPrompt,model_prompt, []) | |
new_chat_button.click( | |
lambda x: x, | |
[bot.chatbot], | |
[bot.chatbot_state], | |
show_api=False, | |
queue=False, | |
).then( | |
save_conversation, | |
[bot.conversation_id, bot.chatbot_state, bot.saved_conversations], | |
[bot.conversation_id, bot.saved_conversations] | |
).then( | |
lambda: (None, []), | |
None, | |
[bot.conversation_id, bot.chatbot], | |
show_api=False, | |
queue=False, | |
).then( | |
lambda x: x, | |
[bot.chatbot], | |
[bot.chatbot_state], | |
show_api=False, | |
queue=False, | |
) | |
on( | |
triggers=[demo.load, bot.saved_conversations.change], | |
fn=load_chat_history, | |
inputs=bot.saved_conversations, | |
outputs=chat_history_dataset, | |
show_api=False, | |
queue=False, | |
) | |
chat_history_dataset.click( | |
lambda: [], | |
None, | |
[bot.chatbot], | |
show_api=False, | |
queue=False, | |
show_progress="hidden", | |
).then( | |
load_conversation, | |
[chat_history_dataset, bot.saved_conversations], | |
[bot.conversation_id, bot.chatbot], | |
show_api=False, | |
queue=False, | |
show_progress="hidden", | |
) | |
with gr.Tab(label='Para', scale=1): | |
gr.Textbox() | |
if __name__ == "__main__": | |
demo.launch() | |