MisDetectorV2 / app.py
You-shen's picture
Update app.py
07f72cf verified
import base64
import io
import os
import sys
import traceback
import gradio as gr
import requests
from PIL import Image
import time
from langchain.agents import AgentExecutor, create_react_agent
from langchain.agents import Tool
from langchain.schema import (
HumanMessage,
)
from langchain.tools import BaseTool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from serpapi import GoogleSearch
import re
from model import *
from gradio import ChatMessage, on
from config import *
from tools import *
from prompt import *
import sys
from gradio.components.chatbot import MessageDict
from langchain_community.llms import Tongyi
os.environ["OPENAI_API_KEY"] = "sb-6a683cb3bd63a9b72040aa2dd08feff8b68f08a0e1d959f5"
os.environ['OPENAI_BASE_URL'] = "https://api.openai-sb.com/v1/"
os.environ["SERPAPI_API_KEY"] = "dcc98b22d5f7d413979a175ff7d75b721c5992a3ee1e2363020b2bbdf4f82404"
os.environ['TAVILY_API_KEY'] = "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" # you
os.environ["REPLICATE_API_TOKEN"] = "r8_IYJpjwjrxegcUfBeBbyUxErJXXsnHDM4AlSQQ"
os.environ["DASHSCOPE_API_KEY"] = "sk-8159f0ed38994c3b96b4527404ea1cda"
# client = OpenAI(
# api_key="EMPTY", # 本地服务不需要 API 密钥
# base_url="http://localhost:8005/v1", # 本地服务的 URL
# )
# # 创建 LangChain 的 LLM 实例
# llm = LangChainOpenAI(
# openai_api_key="EMPTY", # 本地服务不需要 API 密钥
# openai_api_base="http://localhost:8005/v1", # 本地服务的 URL
# model_name="llama3", # 模型名称
# temperature=0.1, # 控制生成文本的随机性
# )
# llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
llm = Tongyi(model_name="qwen-plus", temperature=0.1)
dashscope.api_key = "sk-8159f0ed38994c3b96b4527404ea1cda"
def img_size(image):
width, height = image.size
while width >= 500 or height >= 400:
width = width * 0.8
height = height * 0.8
width = int(width)
height = int(height)
resized_img = image.resize((width, height))
return resized_img
def encode_image(image):
buffered = io.BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
def image_summarize(img_base64, prompt):
# chat = ChatOpenAI(model="gpt-4o", max_tokens=256)
client = OpenAI(
api_key=os.getenv('DASHSCOPE_API_KEY'),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
completion = client.chat.completions.create(
model="qwen-vl-plus",
messages=[
{
"role": "system",
"content": [{"type": "text", "text": f"{prompt}"}]},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_base64}"},
},
{"type": "text", "text": "请分析该图片"},
],
}
],
)
return completion.choices[0].message.content.replace('*','').replace('\n', ' ').strip()
def generate_img_summaries(image):
# img_size(path)
image_summaries = []
prompt = """You are an assistant responsible for compiling images for retrieval\
These abstracts will be embedded and used to retrieve the original images\
Provide a detailed summary of the optimized images for retrieval."""
base64_image = encode_image(image)
image_summaries.append(image_summarize(base64_image, prompt))
return image_summaries
def SelectLanguage(option):
global selected_language
if option == "英文":
selected_language = "en"
else:
selected_language = "ch"
def SelectModel(option):
global selected_model
if option == "自研":
selected_model = "gpt"
elif option == "llama3-70b":
selected_model = "llama3-70b"
elif option == "llama3-8b":
selected_model = "llama3-8b"
else:
selected_model = "mistral"
def SelectConstract(option):
global selected_constract
if option == "qwen":
selected_constract = "qwen"
elif option == "llama":
selected_constract = "llama"
elif option == "glm":
selected_constract = "glm"
elif option == "doubao":
selected_constract = "doubao"
elif option == "deepseek":
selected_constract = "deepseek"
else:
selected_constract = "baichuan"
image_summaries = ""
flag = 8
def update_image_summaries():
global image_summaries
return image_summaries
# def update_flag():
# global flag
# if flag == 1:
# img_path = "../flag_image/1.png"
# elif flag == 2:
# img_path = "../flag_image/2.png"
# elif flag == 3:
# img_path = "../flag_image/3.png"
# elif flag == 4:
# img_path = "../flag_image/4.png"
# elif flag == 5:
# img_path = "../flag_image/5.png"
# elif flag == 6:
# img_path = "../flag_image/6.png"
# elif flag == 7:
# img_path = "../flag_image/7.png"
# else:
# img_path = "../flag_image/8.png"
# img = Image.open(img_path)
# return img
def img_is_modify(image):
if image is None:
return ""
image_summaries = []
prompt = """你是一个负责分析图像的助手,任务是判断图像中展示的内容是否真实,并与现实世界中可能发生的情况一致。\
你的目标是识别图像中可能包含的不现实元素,例如:\
1. 不自然的光线或阴影,与环境不匹配。\
2. 纹理或反射不一致,与场景不对齐。\
3. 出现位置不合适或物理上不可能存在的物体或人物。\
4. 不寻常的比例、角度或视角,这在自然环境中不太可能出现。\
5. 任何其他视觉线索,暗示图像呈现的是不现实或不可能的场景。\
请提供图像的详细总结,说明内容是否真实可信,或者是否存在不现实或不可能的迹象。
"""
# You should respond in Chinese
base64_image = encode_image(image)
image_summaries.append(image_summarize(base64_image, prompt))
summary_text = image_summaries[0]
# 去除换行符
cleaned_text = summary_text.replace('\n', ' ').strip()
# prompt_2 = ""
return cleaned_text
def DailyNews():
url = "https://v3.alapi.cn/api/new/wbtop"
payload = {
"token": "jwgkxqrzqsdi6hzxzwbqngtypomthu",
"num": "10"
}
headers = {"Content-Type": "application/json"}
response = requests.post(url, json=payload, headers=headers).json()
result = []
for i in range(5):
result.append({
"index": i + 1,
"title": response['data'][i]['hot_word'],
"url": response['data'][i]['url']
})
container_html = '<div class="new-container2">'
for news in result:
card_html = f"""
<div class="title2">
<span>{str(news['index']) + "、 " + news['title']}</span>
<a href="{news['url']}" target="_blank">跳转</a>
</div>
"""
container_html += card_html
container_html += '</div>'
return gr.update(value=container_html)
css = """
.news-container {
display: flex; /* 使用 flex 布局 */
flex-wrap: wrap; /* 如果内容过多,允许换行 */
gap: 10px; /* 卡片之间的间距 */
justify-content: flex-start; /* 卡片从左到右排列 */
}
.news-card {
border: 1px solid #ccc; /* 边框 */
border-radius: 5px; /* 圆角 */
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); /* 阴影 */
padding: 10px; /* 内边距 */
margin: 10px 0; /* 外边距 */
width: 100%; /* 卡片宽度适应父容器 */
max-width: 150px; /* 最大宽度为300px */
max-height: 100px;
overflow: hidden; /* 确保内容不会溢出卡片 */
}
.news-card img {
display: block; /* 将图片设为块级元素,避免底部空隙 */
width: 20px; /* 固定宽度为 60px */
height: 20px; /* 固定高度为 60px */
border-radius: 5px; /* 图片圆角 */
object-fit: cover; /* 裁剪图片以适应容器 */
margin-bottom: 5px; /* 图片下方间距 */
}
.news-card .title {
font-weight: bold; /* 标题加粗 */
font-size: 10px; /* 标题字体大小 */
}
.news-card .source {
font-size: 10px;
font-style: italic !important; /* 来源文字斜体 */
color: #666 !important; /* 来源文字颜色 */
}
.new-container2{
width: 200px; /* 容器宽度 */
height: 200px; /* 容器高度 */
overflow-y: auto; /* 超出容器的内容显示滚动条 */
border: 1px solid #ccc; /* 给容器添加边框 */
padding: 10px;
box-sizing: border-box; /* 使得 padding 和 border 包含在容器大小内 */
}
.title2 {
margin-bottom: 10px; /* 每条新闻之间的间距 */
font-weight: bold;
font-size: 14px;
line-height: 1.6;
}
.title2 a {
text-decoration: none;
color: #007bff;
}
.title2 a:hover {
text-decoration: underline;
}
"""
def format_reply(reply):
formatted_text = ""
news_list = reply.get("news", [])
container_html = '<div class="news-container">' # 添加容器
for news in news_list[:5]:
card_html = f"""
<div class="news-card">
<img src="{news['image']}" alt="新闻图片">
<div class="title"><a href="{news['url']}" target="_blank">{news['title']}</a></div>
<div class="source">{news['source']}</div>
</div>
"""
container_html += card_html
container_html += '</div>' # 关闭容器
formatted_text += container_html
formatted_text += reply.get("text", "")
return formatted_text
def constract():
global selected_constract
global query
tool = BoChaSearchTool()
web_information = tool._run(query)
information = []
for step in web_information['data']['webPages']['value']:
information.append(step['snippet'])
query = query + "搜索到的相关新闻:" + str(information)
if selected_constract == "qwen":
result = qwen(query)
elif selected_constract == "llama":
result = llama(query)
elif selected_constract == "glm":
result = glm(query)
elif selected_constract == "doubao":
result = doubao(query)
elif selected_constract == "deepseek":
result = deepseek(query)
else:
result = baichuan(query)
return result
def SelectTheme(theme):
return gr.Chatbot(layout=theme, type="messages")
def generate_chat_title(conversation: list[MessageDict]) -> str:
title = ""
for message in conversation:
if message["role"] == "user":
if isinstance(message["content"], str):
title += message["content"]
break
else:
title += "📎 "
if len(title) > 40:
title = title[:40] + "..."
# print(title)
return title or "Conversation"
def load_chat_history(conversations):
# print(conversations)
return gr.Dataset(
samples=[
[generate_chat_title(conv)]
for conv in conversations or []
if conv
]
)
def dispaly_state(state):
print(state)
def save_conversation(
index: int | None,
conversation: list[MessageDict],
saved_conversations: list[list[MessageDict]],
):
if index is not None:
saved_conversations[index] = conversation
else:
saved_conversations.append(conversation)
index = len(saved_conversations) - 1
return index, saved_conversations
def load_conversation(
index: int,
conversations: list[list[MessageDict]],
):
return (
index,
gr.Chatbot(
value=conversations[index], # type: ignore
feedback_value=[],
type="messages"
),
)
def ModelPrompt(prompt):
global constract_model_prompt
constract_model_prompt = prompt
def react(dict, space):
global image_summaries
global flag
global query
topic = dict['text']
if dict['files'] == []: # 没有图片
image = None
else:
image_path = dict['files'][0]
image = Image.open(image_path)
image = img_size(image) # 调整图像尺寸
image_summaries = img_is_modify(image) # 获得图像信息
if topic == "":
result = img_is_modify(image) # 判断是否修改
return result
query = topic
tools = [BoChaSearchTool(), TavilySearchResults(max_result=1),
ImageSearchTool(), WeatherCrossing(), GetHoliday(), GetLocation(),
CurrencyConversion(), SafeCodeExecutor(), SafeExpressionEvaluator(),
RegionInquiryTool(), HTMLTextExtractor()]
if selected_language == "en":
prompt = ChatPromptTemplate.from_template(en_prompt)
else:
prompt = ChatPromptTemplate.from_template(ch_prompt)
agent = create_react_agent(llm, tools, prompt)
captured_output = io.StringIO()
# 将 sys.stdout 重定向到 StringIO 对象
sys.stdout = captured_output
cur_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True,
return_intermediate_steps=True, include_run_info=True)
response = agent_executor.invoke(
{"current_time": cur_time,
"input": topic,
"image_information": image_summaries})
sys.stdout = sys.__stdout__
captured_content = str(captured_output.getvalue())
captured_content = re.sub(r'\x1b\[[0-9;]*m', '', captured_content)
print("captured_content begin", captured_content, "\ncaptured_content end")
explain = ""
try:
match = re.search(r'\{.*\}(.*)', captured_content, re.DOTALL)
if match:
extracted_content = match.group(1).strip()
match2 = re.search(
r'Summary:(.*)Final Answer:',
extracted_content,
re.DOTALL)
if match2:
extracted_content2 = match2.group(1).strip()
explain = extracted_content2
except Exception as e:
error_message = traceback.format_exc()
print(error_message)
text = "<b>思考结果:</b>\n" + re.sub(r'<.*?>', '', explain) + "\n\n<b>鉴定结果:</b>\n" + re.sub(r'<.*?>', '', response['output'].replace('*', '').strip())
text = text.replace("\n", "<br>")
reply = {
'text': text,
'news': []
}
# if selected_language == 'ch':
# if "完全不正确" in response['output']:
# flag = 1
# elif "大部分不正确" in response['output']:
# flag = 2
# elif "真假参半" in response['output']:
# flag = 3
# elif "大部分正确" in response['output']:
# flag = 4
# elif "完全正确" in response['output']:
# flag = 5
# elif "可能错误" in response['output']:
# flag = 6
# elif "可能正确" in response['output']:
# flag = 7
# else:
# flag = 8
# else:
# if "Completely_False" in response['output']:
# flag = 1
# elif "Mostly_False" in response['output']:
# flag = 2
# elif "Mixed" in response['output']:
# flag = 3
# elif "Mostly_True" in response['output']:
# flag = 4
# elif "Completely_True" in response['output']:
# flag = 5
# elif "Likely_False" in response['output']:
# flag = 6
# elif "Likely_True" in response['output']:
# flag = 7
# else:
# flag = 8
agent_thought = []
pattern1 = r"Pre Thought:(.*?)Thought: (.*?)\nAction: (.*?)\nAction Input: (.*)"
pattern2 = r"Thought: (.*?)\nAction: (.*?)\nAction Input: (.*)"
for index in response['intermediate_steps']:
match = re.search(pattern1, index[0].log, re.S)
before_thought = match.group(1).strip() if match else ""
thought = match.group(2).strip() if match else ""
action = match.group(3).strip() if match else ""
action_input = match.group(4).strip() if match else ""
if not match:
match = re.search(pattern2, index[0].log, re.S)
before_thought = match.group(1).strip() if match else ""
thought = match.group(2).strip() if match else ""
action_input = match.group(3).strip() if match else ""
if len(before_thought) >= 3:
agent_thought.append(
ChatMessage(
role="assistant",
content=before_thought,
metadata={"title": "预思考:"}
)
)
if len(action_input) >= 3:
agent_thought.append(
ChatMessage(
role="assistant",
content=action_input,
metadata={"title": f"工具调用:{action}"}
)
)
if response['intermediate_steps'][0][0].tool == "tavily_search_results_json":
i = 0
for step1 in response['intermediate_steps']:
for step2 in step1[1]:
reply['news'].append({
"title": step2['content'][:28] + "...",
"url": step2['url'],
"source": "",
"image": ""
})
i = i + 1
if i == 3:
break
break
if response['intermediate_steps'][0][0].tool == "BoCha Webs Search":
for step1 in response['intermediate_steps']:
try:
for i in range(3):
reply['news'].append({
"title": step1[1]['data']['webPages']['value'][i]['snippet'][:28] + "...",
"url": step1[1]['data']['webPages']['value'][i]['url'],
"source": step1[1]['data']['webPages']['value'][i]['siteName'],
"image": step1[1]['data']['images']['value'][i]['contentUrl']
})
except BaseException:
continue
if response['intermediate_steps'][0][0].tool == "Baidu News Search":
for step1 in response['intermediate_steps']:
try:
for i in range(3):
reply['news'].append({
"title": step1[1][i]['title'][:28] + "...",
"url": step1[1][i]['link'],
"source": step1[1][i]['source'],
"image": ""
})
except BaseException:
continue
formatted_reply = format_reply(reply)
# print(formatted_reply)
return agent_thought[:2] + [formatted_reply]
with gr.Blocks(css=css, theme='soft') as demo:
gr.HTML("<h1 style='font-size: 36px; text-align: center; color: #333333; margin-bottom: 20px;'>虚假信息检测系统</h1>")
with gr.Tab(label='Chat'):
with gr.Row():
with gr.Sidebar():
with gr.Column(scale=1):
gr.Textbox(visible=False)
with gr.Column(scale=1):
gr.Textbox(visible=False)
with gr.Column(scale=1):
new_chat_button = gr.Button(
"New chat",
variant="primary",
size="md",
icon="plus.svg",
)
chat_history_dataset = gr.Dataset(
components=[gr.Textbox(visible=False)],
show_label=False,
layout="table",
type="index",
)
with gr.Accordion("展示每日热搜", open=False):
daily_news = gr.HTML(label="每日热搜", show_label=True, container=True)
language_select = gr.Dropdown(["中文", "英文"], label="请选择要使用的语言", scale=1, value="中文")
model_select = gr.Dropdown(["自研", "llama3-70b", "llama3-8b", "mistral"],
label="请选择要使用的大模型",
scale=1, value="自研")
# flag = gr.Image(label="新闻标签", type="numpy", visible=False)
theme_select = gr.Dropdown(["气泡", "面板"], label="切换聊天样式", value="气泡")
daily_bn = gr.Button("查看每日热搜")
with gr.Column(scale=3):
bot = gr.ChatInterface(
fn=react,
examples=[
{"text": "9月19日,马来西亚最高元首 Ibrahim 应邀对中国进行为期8天国事访问,亦是2024年1月上任以来首次访问东盟外国家。"},
{"text": "据最新天文研究,火星的轨道将逐渐接近地球,最终成为地球的“第二月亮”。天文学家预测这一变化将在2025年发生,届时火星将在夜空中与月亮一样明亮,影响全球潮汐和生态平衡。"}
],
chatbot=gr.Chatbot(label='自研系统',
avatar_images=("/image/human.png", "/image/bot.png"),
type="messages",
height = 600,
layout="bubble",
show_copy_button=True,
show_copy_all_button=True,
),
multimodal=True,
show_progress='full',
type="messages",
flagging_mode='manual',
cache_examples = False,
example_icons=["/image/search.png", "/image/search.png"]
)
with gr.Column(scale=1):
with gr.Accordion("图片检测", open=False):
img_info = gr.Textbox(label="提取到的信息", lines=5)
# contrast_model = gr.Textbox(label="对比模型", lines=5)
# with gr.Accordion("展示对比模型prompt", open=False):
# model_prompt = gr.Textbox(label="对比模型prompt", lines=5, placeholder=constract_model_prompt, interactive=True)
# constract_select = gr.Dropdown(["qwen", "llama", "glm", "doubao", "deepseek", "baichuan"],
# label="请选择使用的对比模型", scale=1, value="qwen")
# constract_bn = gr.Button("展示对比模型")
# prompt_bn = gr.Button("确定更改提示词")
img_bn = gr.Button("显示检测结果")
img_bn.click(update_image_summaries, [], img_info)
# flag_bn.click(update_flag, [], flag)
daily_bn.click(DailyNews, [], [daily_news])
language_select.change(SelectLanguage, language_select, [])
model_select.change(SelectModel, model_select, [])
theme_select.change(SelectTheme, theme_select, bot.chatbot)
# constract_select.change(SelectConstract, constract_select, [])
# constract_bn.click(constract, [], contrast_model)
# prompt_bn.click(ModelPrompt,model_prompt, [])
new_chat_button.click(
lambda x: x,
[bot.chatbot],
[bot.chatbot_state],
show_api=False,
queue=False,
).then(
save_conversation,
[bot.conversation_id, bot.chatbot_state, bot.saved_conversations],
[bot.conversation_id, bot.saved_conversations]
).then(
lambda: (None, []),
None,
[bot.conversation_id, bot.chatbot],
show_api=False,
queue=False,
).then(
lambda x: x,
[bot.chatbot],
[bot.chatbot_state],
show_api=False,
queue=False,
)
on(
triggers=[demo.load, bot.saved_conversations.change],
fn=load_chat_history,
inputs=bot.saved_conversations,
outputs=chat_history_dataset,
show_api=False,
queue=False,
)
chat_history_dataset.click(
lambda: [],
None,
[bot.chatbot],
show_api=False,
queue=False,
show_progress="hidden",
).then(
load_conversation,
[chat_history_dataset, bot.saved_conversations],
[bot.conversation_id, bot.chatbot],
show_api=False,
queue=False,
show_progress="hidden",
)
with gr.Tab(label='Para', scale=1):
gr.Textbox()
if __name__ == "__main__":
demo.launch()