ChienChung's picture
Update app.py
fdf059e verified
#!/usr/bin/env python
import os
import shutil
import tempfile
import json
import torch
import re
import requests
import transformers
import chardet
import deepeval
import difflib
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.models.llama.configuration_llama import LlamaConfig
from huggingface_hub import hf_hub_download
from typing import List, Dict, Any
import gradio as gr
from pathlib import Path
# Solve permission issues
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
os.environ["HOME"] = "/tmp"
os.environ["XDG_CACHE_HOME"] = "/tmp/.cache"
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers"
os.environ["HF_DATASETS_CACHE"] = "/tmp/huggingface/datasets"
os.environ["HF_METRICS_CACHE"] = "/tmp/huggingface/metrics"
os.environ["GRADIO_FLAGGING_DIR"] = "/tmp/flagged"
os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/sentence_transformers"
os.environ["HF_HUB_CACHE"] = "/tmp/huggingface/hf_cache"
os.environ["HF_HUB_DOWNLOAD_TIMEOUT"] = "60"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = os.getenv("LANGCHAIN_PROJECT")
# 設置環境變數,確保 AutoGen 可以寫入臨時目錄
os.environ["AUTOGEN_WORKSPACE"] = "/tmp/autogen_workspace"
os.makedirs("/tmp/autogen_workspace", exist_ok=True)
os.chmod("/tmp/autogen_workspace", 0o777) # 確保目錄可寫
# 設置 OpenAI API 相關環境變數
os.environ["OPENAI_API_TYPE"] = "open_ai" # 如果您使用的是 OpenAI API
# ✅ 建立 temp 安全區
os.environ["DEEPEVAL_TELEMETRY_OPT_OUT"] = "YES"
os.environ["DEEPEVAL_RESULTS_FOLDER"] = "/tmp/deepeval_results"
os.makedirs("/tmp/deepeval_results", exist_ok=True)
# ✅ 修正 Python tempdir 基底(避免它寫 home)
import tempfile
tempfile.tempdir = "/tmp"
# 在此處加入 DeepEval 的 monkey-patch,避免全域更改工作目錄
original_evaluate = deepeval.evaluate
def patched_evaluate(*args, **kwargs):
current_dir = os.getcwd()
try:
os.chdir("/tmp")
return original_evaluate(*args, **kwargs)
finally:
os.chdir(current_dir)
deepeval.evaluate = patched_evaluate
SHOW_EVAL = os.getenv("SHOW_EVAL", "false").lower() == "true"
# Load Required Modules
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma, FAISS
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.llms import HuggingFacePipeline
from langchain.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain_community.document_loaders import PyPDFLoader, TextLoader, UnstructuredWordDocumentLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.summarize import load_summarize_chain
from tempfile import mkdtemp
from langchain.schema import AIMessage
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from dateutil import parser as date_parser
import numexpr as ne
import pandas as pd
# Multi-Agent Imports
from serpapi import GoogleSearch
# CrewAI Section: completely use CrewAI's Agent, Task, Crew and @tool decorator
from crewai import Crew, Agent, Task, Process
from crewai.tools import tool
from geopy.geocoders import Nominatim
from timezonefinder import TimezoneFinder
from langchain_experimental.agents import create_pandas_dataframe_agent
from langsmith import traceable
from deepeval import evaluate
from deepeval.metrics import AnswerRelevancyMetric
from deepeval.test_case import LLMTestCase
# from langgraph.graph import Graph
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableLambda
from langchain.chains import LLMChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from sentence_transformers import SentenceTransformer
# === AutoGen for multi-intent collaboration ===
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
try:
from phoenix.trace.langchain import LangChainInstrumentor
LangChainInstrumentor().instrument()
except Exception as e:
print(f"[WARNING] Failed to load Phoenix LangChain trace: {e}")
session_retriever = None
session_qa_chain = None
csv_dataframe = None # CSV tool will use this
# Safe Result Formatter
def safe_format_result(result) -> str:
try:
if hasattr(result, "agent_name") and hasattr(result, "output"):
return f"[Agent: {result.agent_name}]\n{result.output}"
elif isinstance(result, str):
return result
elif isinstance(result, dict):
return json.dumps(result, indent=2)
elif isinstance(result, list):
return "\n".join(str(r) for r in result)
else:
return str(result)
except Exception as e:
return f"Error formatting result: {e}"
# Model and Device Setup
if torch.backends.mps.is_available():
device = "mps"
elif torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
print(f"Using device => {device}")
hf_token = os.environ.get("HF_TOKEN")
openai_api_key = os.environ.get("OPENAI_API_KEY")
model_id = "ChienChung/my-llama-1b"
config_path = hf_hub_download(
repo_id=model_id,
filename="config.json",
use_auth_token=hf_token,
cache_dir="/tmp/huggingface"
)
with open(config_path, "r", encoding="utf-8") as f:
config_dict = json.load(f)
if "rope_scaling" in config_dict:
config_dict["rope_scaling"] = {"type": "dynamic", "factor": config_dict["rope_scaling"].get("factor", 32.0)}
model_config = LlamaConfig.from_dict(config_dict)
model_config.trust_remote_code = True
print("Loading Llama model...")
model = AutoModelForCausalLM.from_pretrained(
model_id,
config=model_config,
trust_remote_code=True,
use_auth_token=hf_token,
cache_dir="/tmp/huggingface"
)
model.to(device)
print("Model loaded!")
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(
model_id,
trust_remote_code=True,
use_auth_token=hf_token,
cache_dir="/tmp/huggingface"
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
print("Tokenizer loaded!")
query_pipeline = transformers.pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device != "cpu" else None,
do_sample=False,
temperature=0.0,
max_new_tokens=200,
return_full_text=False
)
# Chroma DB and Document Retrieval Setup
print("Loading Chroma DB for Biden Speech...")
if not os.path.exists("/tmp/chroma_db"):
shutil.copytree("./chroma_db", "/tmp/chroma_db")
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
vectordb = Chroma(persist_directory="/tmp/chroma_db", embedding_function=embeddings)
retriever = vectordb.as_retriever()
custom_prompt = PromptTemplate(
input_variables=["context", "question"],
template="""You are a helpful AI assistant. Use only the text from the context below to answer the user's question.
If the answer is not in the context, say "No relevant info found."
If the question is not in the context, say "No relevant info found."
Return only the final answer in one to three sentences.
Do not restate the question or context.
Do not include these instructions in your final output.
Context:
{context}
Question: {question}
Answer:
"""
)
llm_local = HuggingFacePipeline(pipeline=query_pipeline)
llm_gpt4 = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.2, openai_api_key=openai_api_key)
crew_llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0.2,
openai_api_key=openai_api_key
)
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
qa_gpt = ConversationalRetrievalChain.from_llm(
llm=llm_gpt4,
retriever=retriever,
memory=memory,
combine_docs_chain_kwargs={"prompt": custom_prompt}
)
# Helper Function: Extract file path from uploaded file
def get_file_path(file):
if isinstance(file, str):
return file
elif isinstance(file, dict):
# Prefer using the "data" key, then "name"
return file.get("data", file.get("name", None))
elif hasattr(file, "save"):
temp_dir = mkdtemp()
file_path = os.path.join(temp_dir, file.name)
file.save(file_path)
return file_path
else:
return None
# Original functionalities (Tabs 1-4) functions
@traceable(name="Biden LLaMA QA")
def rag_llama_qa(query):
output = RetrievalQA.from_chain_type(
llm=llm_local,
chain_type="stuff",
retriever=retriever,
return_source_documents=False,
chain_type_kwargs={"prompt": custom_prompt}
).run(query)
lower_text = output.lower()
idx = lower_text.find("answer:")
return output[idx + len("answer:"):].strip() if idx != -1 else output
@traceable(name="GPT-4 Document QA")
def rag_gpt4_qa(query):
raw_answer = qa_gpt.run(query)
if SHOW_EVAL:
try:
top_docs = retriever.get_relevant_documents(query)
test_case = LLMTestCase(
input=query,
actual_output=raw_answer,
expected_output=raw_answer,
context=[doc.page_content for doc in top_docs[:3]]
)
metric = AnswerRelevancyMetric(model="gpt-4o-mini")
results = evaluate([test_case], [metric])
result = results[0]
print(f"[DeepEval Tab4] Input: {query}")
print(f"[DeepEval Tab4] Passed: {result.passed}, Score: {result.score:.2f}, Reason: {result.reason}")
except Exception as e:
print(f"[DeepEval Tab4] Evaluation failed: {e}")
return raw_answer
@traceable(name="Upload Document QA")
def upload_and_chat(file, query):
file_path = get_file_path(file)
if file_path is None:
return "Unable to obtain the uploaded file path."
if file_path.lower().endswith(".pdf"):
loader = PyPDFLoader(file_path)
elif file_path.lower().endswith(".docx"):
loader = UnstructuredWordDocumentLoader(file_path)
else:
loader = TextLoader(file_path)
docs = loader.load()
chunks = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50).split_documents(docs)
db = FAISS.from_documents(chunks, embeddings)
temp_retriever = db.as_retriever()
qa_temp = RetrievalQA.from_chain_type(
llm=llm_gpt4,
chain_type="stuff",
retriever=temp_retriever,
return_source_documents=False,
chain_type_kwargs={"prompt": custom_prompt}
)
raw_answer = qa_temp.run(query)
if SHOW_EVAL:
try:
test_case = LLMTestCase(
input=query,
actual_output=raw_answer,
expected_output=raw_answer,
context=[d.page_content for d in chunks[:3]]
)
metric = AnswerRelevancyMetric(model="gpt-4o-mini") # default is GPT-4o
results = evaluate([test_case], [metric])
result = results[0]
print(f"[DeepEval QA] Input: {query}")
print(f"[DeepEval QA] Passed: {result.passed}, Score: {result.score:.2f}, Reason: {result.reason}")
except Exception as e:
print(f"[DeepEval QA] Evaluation failed: {e}")
return raw_answer
initial_prompt = PromptTemplate(
input_variables=["text"],
template="""Write a concise and structured summary of the following content. Focus on capturing the main ideas and key details:
{text}
--- Summary ---
"""
)
refine_prompt = PromptTemplate(
input_variables=["existing_answer", "text"],
template="""You already have an existing summary:
{existing_answer}
Refine the summary based on the new content below. Add or update information only if it's relevant. Keep it concise:
{text}
--- Refined Summary ---
"""
)
@traceable(name="Document Summarise")
def document_summarize(file):
file_path = get_file_path(file)
if file_path is None:
return "Unable to obtain the uploaded file."
if file_path.lower().endswith(".pdf"):
loader = PyPDFLoader(file_path)
elif file_path.lower().endswith(".docx"):
loader = UnstructuredWordDocumentLoader(file_path)
else:
loader = TextLoader(file_path)
docs = loader.load()
summarize_chain = load_summarize_chain(llm_gpt4, chain_type="refine", question_prompt=initial_prompt, refine_prompt=refine_prompt)
summary = summarize_chain.invoke(docs)
return summary['output_text']
def csv_agent(file, query):
file_path = get_file_path(file)
if file_path is None:
return "Unable to obtain the uploaded CSV file."
try:
with open(file_path, 'rb') as f:
result = chardet.detect(f.read())
encoding = result['encoding']
df = pd.read_csv(file_path, encoding=encoding)
except Exception as e:
return f"Error reading CSV: {e}"
safe_dict = {"df": df}
try:
result = ne.evaluate(query, local_dict=safe_dict)
return str(result)
except Exception as e:
return f"Query error: {e}"
def search_web(query):
if isinstance(query, dict):
query = query.get("query", "")
api_key = os.environ.get("SERPAPI_API_KEY")
if not api_key:
return "SERPAPI_API_KEY not set. Please set the environment variable."
params = {"engine": "google", "q": query, "api_key": api_key, "num": 10}
search = GoogleSearch(params)
results = search.get_dict()
if "organic_results" in results:
raw_output = ""
for result in results["organic_results"]:
title = result.get("title", "No Title")
link = result.get("link", "No Link")
snippet = result.get("snippet", "No Snippet")
raw_output += f"Title: {title}\nLink: {link}\nSnippet: {snippet}\n\n"
prompt = f"""
You are a helpful assistant. Given the following web search results and the user's question:
1. First, identify **only** the most relevant entries.
2. Then, summarise the key insights using fluent, **British English**.
3. If the user's question asks for trends, categories, or multiple items, you may present key points in a non-markdown bullet-point style (e.g. use "•" instead of "-", avoid using "**" for bold).
4. Otherwise, reply in a short, natural paragraph.
5. Do **not** use any markdown formatting such as `**`, `##`, or list syntax.
6. Keep your answer concise and professional, as if explaining to a colleague.
7. If no relevant info is found, say: "Sorry, I couldn't find a reliable answer from the current results."
--- Web Search Results ---
{raw_output}
--- User's Question ---
"{query}"
Answer:
"""
summarized = _general_chat(prompt)
return summarized if summarized else raw_output.strip()
else:
return "No results found."
def uploaded_qa(file, query):
file_path = get_file_path(file)
if file_path is None:
return "Unable to obtain the uploaded file path."
if file_path.lower().endswith(".pdf"):
loader = PyPDFLoader(file_path)
elif file_path.lower().endswith(".docx"):
loader = UnstructuredWordDocumentLoader(file_path)
else:
loader = TextLoader(file_path)
docs = loader.load()
chunks = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50).split_documents(docs)
db = FAISS.from_documents(chunks, embeddings)
temp_retriever = db.as_retriever()
qa_temp = RetrievalQA.from_chain_type(
llm=llm_gpt4,
chain_type="stuff",
retriever=temp_retriever,
return_source_documents=False,
chain_type_kwargs={"prompt": custom_prompt}
)
return qa_temp.run(query)
# CrewAI Multi-Agent System (Tab 5)
# Completely abandon langchain.agents.Tool and use CrewAI's @tool decorator to define tools
from pydantic import BaseModel
class SimpleQuery(BaseModel):
query: str
def _general_chat(query: str) -> str:
try:
response = llm_gpt4.invoke(query)
if isinstance(response, AIMessage):
response = response.content # Extract the actual string
if any(kw in response.lower() for kw in ["i'm not sure", "i don't know", "no information", "can't find"]):
return _search_web_tool(query)
return response
except Exception as e:
return f"General chat error: {e}"
@tool("general_chat")
def general_chat_tool(query: str) -> str:
"""General assistant: Answer general questions without relying on documents."""
try:
response = llm_gpt4.invoke(query)
if isinstance(response, AIMessage):
response = response.content # Extract the actual string
if any(kw in response.lower() for kw in ["i'm not sure", "i don't know", "no information", "can't find"]):
return search_web(query)
return response
except Exception as e:
return f"General chat error: {e}"
def location_to_timezone(location: str) -> str:
try:
geo = Nominatim(user_agent="time_agent_demo")
loc = geo.geocode(location)
if not loc:
return "Europe/London"
tf = TimezoneFinder()
return tf.timezone_at(lng=loc.longitude, lat=loc.latitude) or "Europe/London"
except Exception:
return "Europe/London"
def get_time_tool(query: str) -> str:
# use GPT to find location keyword
try:
location_prompt = f"""
You are a location extractor. Given a user's query about time or date, return the location mentioned in it. If not found, return "London".
Examples:
- "What's the time in Tokyo now?" → Tokyo
- "今天台北幾點?" → Taipei
- "現在在紐約幾點?" → New York
- "今天幾號?" → London
- "What date is today?" → London
Now process this query: "{query}"
"""
location_response = llm_gpt4.invoke(location_prompt)
if isinstance(location_response, AIMessage):
location = location_response.content.strip()
else:
location = str(location_response).strip()
except Exception as e:
location = "London"
location_key = location.lower()
tz_str = location_to_timezone(location)
now = datetime.now(ZoneInfo(tz_str))
# return time or date
q_lower = query.lower()
if any(k in q_lower for k in ["date", "幾號", "today", "day"]):
return now.strftime(f"The date in {location.title()} is %B %d, %Y (%A).")
elif any(k in q_lower for k in ["time", "幾點", "現在"]):
return now.strftime(f"The time in {location.title()} is %I:%M %p.")
else:
return now.strftime(f"The local time in {location.title()} is %I:%M %p on %B %d, %Y.")
@tool("time_tl")
def time_tool(query: str) -> str:
"""Time Agent: Answer time or date queries worldwide using LLM + GeoLocator + TimezoneFinder."""
# use GPT to find location keyword
try:
location_prompt = f"""
You are a location extractor. Given a user's query about time or date, return the location mentioned in it. If not found, return "London".
Examples:
- "What's the time in Tokyo now?" → Tokyo
- "今天台北幾點?" → Taipei
- "現在在紐約幾點?" → New York
- "今天幾號?" → London
- "What date is today?" → London
Now process this query: "{query}"
"""
location_response = llm_gpt4.invoke(location_prompt)
if isinstance(location_response, AIMessage):
location = location_response.content.strip()
else:
location = str(location_response).strip()
except Exception as e:
location = "London"
location_key = location.lower()
tz_str = location_to_timezone(location)
now = datetime.now(ZoneInfo(tz_str))
# return time or date
q_lower = query.lower()
if any(k in q_lower for k in ["date", "幾號", "today", "day"]):
return now.strftime(f"The date in {location.title()} is %B %d, %Y (%A).")
elif any(k in q_lower for k in ["time", "幾點", "現在"]):
return now.strftime(f"The time in {location.title()} is %I:%M %p.")
else:
return now.strftime(f"The local time in {location.title()} is %I:%M %p on %B %d, %Y.")
weather_api_key = os.environ.get("WEATHER_API_KEY")
def get_time_tool2(query: str) -> datetime:
try:
# Step 1: 抽出地點
location_prompt = f"""
You are a location extractor. Given a user's query about time or date, return the location mentioned in it.
If not found, return "London".
Query: "{query}"
"""
location_response = llm_gpt4.invoke(location_prompt)
location = location_response.content.strip() if isinstance(location_response, AIMessage) else str(location_response).strip()
# Step 2: 當地目前時間(加入 DEBUG)
print(f"[DEBUG] Extracted Location: {location}")
tz_str = location_to_timezone(location)
print(f"[DEBUG] Timezone: {tz_str}")
now = datetime.now(ZoneInfo(tz_str))
print(f"[DEBUG] Local Time at {location}: {now}")
# Step 3: 動態 few-shot prompt(每次更新 based on now)
examples = [
("five hours later", now + timedelta(hours=5)),
("later", now + timedelta(hours=2)),
("soon", now + timedelta(minutes=30)),
("shortly", now + timedelta(minutes=15)),
("after a while", now + timedelta(hours=1)),
("tomorrow at 3pm", now.replace(hour=15, minute=0, second=0) + timedelta(days=1)),
("the day after tomorrow at 10am", now.replace(hour=10, minute=0, second=0) + timedelta(days=2)),
("last Monday 9am", (now - timedelta(days=(now.weekday() + 7))).replace(hour=9, minute=0, second=0)),
("next Monday", (now + timedelta(days=(7 - now.weekday()))).replace(hour=12, minute=0, second=0)),
("last Friday", (now - timedelta(days=(now.weekday() - 4 + 7) % 7)).replace(hour=12, minute=0, second=0)),
("next Friday", (now + timedelta(days=(4 - now.weekday() + 7) % 7)).replace(hour=12, minute=0, second=0)),
("in 10 hours", now + timedelta(hours=10)),
("this weekend", (now + timedelta(days=(5 - now.weekday()) % 7)).replace(hour=10, minute=0, second=0)),
("next weekend", (now + timedelta(days=((5 - now.weekday()) % 7) + 7)).replace(hour=10, minute=0, second=0)),
("下週一下午三點", (now + timedelta(days=(7 - now.weekday() + 0) % 7)).replace(hour=15, minute=0, second=0)),
("昨天下午五點", (now - timedelta(days=1)).replace(hour=17, minute=0, second=0)),
("昨天早上八點", (now - timedelta(days=1)).replace(hour=8, minute=0, second=0)),
("later this evening", now.replace(hour=20, minute=0, second=0)),
("現在", now),
("last month", (now - timedelta(days=30)).replace(hour=12, minute=0, second=0)),
("early tomorrow morning", now.replace(hour=6, minute=0, second=0) + timedelta(days=1)),
("in 2 hours", now + timedelta(hours=2)),
("in one hour", now + timedelta(hours=1)),
("in 30 minutes", now + timedelta(minutes=30)),
("in a few minutes", now + timedelta(minutes=10)),
]
# 加入 local time 說明在 Examples 區段
examples_header = f"""Assume the current local time in {location} is exactly:
**{now.strftime('%Y-%m-%d %H:%M:%S')}** (timezone: {tz_str})
Use this exact time to reason all examples below.
"""
examples_str = "\n".join([f'User Query: "{q}" → {dt.strftime("%Y-%m-%d %H:%M:%S")}' for q, dt in examples])
# Step 4: 构建完整 prompt
# Step 4: 构建完整 prompt
time_query_prompt = f"""
You are a timezone-aware time reasoner. Based on the user's query, calculate the **exact target time** they are referring to.
Remember: all relative expressions like "later", "in 2 hours", "tomorrow" must be strictly calculated based on the current local time above.
{examples_header}
Please return the result in this **exact format**: `YYYY-MM-DD HH:MM:SS` (24-hour clock, no timezone info).
Only return the time string — no explanation, no extra words.
### Examples:
{examples_str}
### Now process:
User Query: "{query}"
"""
time_response = llm_gpt4.invoke(time_query_prompt)
time_str = time_response.content.strip() if isinstance(time_response, AIMessage) else str(time_response).strip()
# Step 5: 嘗試解析時間
try:
target_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
target_time = target_time.replace(tzinfo=ZoneInfo(tz_str))
return target_time
except Exception:
return f"Failed to parse time string from LLM: '{time_str}'"
except Exception as e:
return f"Error in retrieving location or time information: {e}"
def weather_agent_tool(query: str) -> str:
"""Weather Agent: Return current, hourly, or historical weather info using WeatherAPI."""
try:
weather_api_key = os.environ.get("WEATHER_API_KEY")
if not weather_api_key:
return "Weather API key not found. Please set WEATHER_API_KEY env variable."
# Step 1: Extract location
location_prompt = f"""
You are a location extractor. Given a user's query about weather, extract the location mentioned in it.
If not found, return "London".
Examples:
- "Is it gonna rain in Tokyo?" → Tokyo
- "Will it be hot in New York later?" → New York
- "明天下午高雄會不會下雨?" → Kaohsiung
- "How’s the weather?" → London
Query: "{query}"
"""
location_resp = llm_gpt4.invoke(location_prompt)
location = location_resp.content.strip() if isinstance(location_resp, AIMessage) else str(location_resp).strip()
# Step 2: Get timezone and time
target_dt = get_time_tool2(query)
# if isinstance(target_dt, str):
# target_dt = datetime.strptime(target_dt, "%Y-%m-%d %H:%M:%S")
if not isinstance(target_dt, datetime):
return f"Failed to parse the target time from your query. Got: {target_dt}"
tz_str = location_to_timezone(location)
target_dt = target_dt.replace(tzinfo=ZoneInfo(tz_str))
now = datetime.now(ZoneInfo(tz_str)) # 用同一時區的 now 去比較!
# Step 3: Check limits and decide API
if target_dt < now - timedelta(days=7):
return "Only supports up to 7 days of historical data."
elif target_dt > now + timedelta(days=2):
return "Only supports up to 3 days of forecast."
if target_dt < now:
url = f"http://api.weatherapi.com/v1/history.json?key={weather_api_key}&q={location}&dt={target_dt.strftime('%Y-%m-%d')}"
else:
url = f"http://api.weatherapi.com/v1/forecast.json?key={weather_api_key}&q={location}&days=3&aqi=no&alerts=no"
data = requests.get(url).json()
forecast_hours = []
if "forecast" in data:
for day in data["forecast"]["forecastday"]:
for hour in day["hour"]:
forecast_hours.append(hour)
elif "forecastday" in data:
forecast_hours = data["forecastday"][0]["hour"]
else:
return "No forecast data available."
# Step 4: Find closest hour
min_diff = float("inf")
closest_hour = None
for hour_data in forecast_hours:
hour_dt = date_parser.parse(hour_data["time"]).replace(tzinfo=ZoneInfo(tz_str))
diff = abs((hour_dt - target_dt).total_seconds())
if diff < min_diff:
min_diff = diff
closest_hour = hour_data
if not closest_hour:
return f"No hourly data found for {target_dt.strftime('%Y-%m-%d %H:%M')}."
# Step 5: Generate summary
condition = closest_hour["condition"]["text"]
temp = closest_hour["temp_c"]
feels = closest_hour["feelslike_c"]
humidity = closest_hour["humidity"]
chance_rain = closest_hour.get("chance_of_rain", 0)
hour_str = closest_hour["time"].split(" ")[1]
summary_prompt = f"""
You are a helpful weather reasoning assistant.
The user wants to know about the weather conditions at a specific time: {target_dt.strftime('%Y-%m-%d %H:%M')} in {location}.
Use the data below to answer their question. This may refer to the past, present, or future — do not assume it is the current weather.
Based on the following weather data and the user's question, think step-by-step to extract the most relevant information, and give a natural, friendly, and cautious answer in British English.
Avoid being overly confident — never say "Yes, it will..." or "Definitely." Instead, use expressions like:
- "It is very likely that..."
- "There is a high chance of..."
- "Based on the available data, it seems that..."
- "There may be..."
Also, after answering the question, include a short weather summary and a useful suggestion (e.g., bring an umbrella, wear sunscreen, avoid outdoor activities).
**Do not use markdown formatting such as `*`, `**`, or list symbols.**
--- Weather Data ---
Location: {location}
Time: {target_dt.strftime('%Y-%m-%d')} at {hour_str}
Condition: {condition}
Temperature: {temp}°C (Feels like {feels}°C)
Humidity: {humidity}%
Chance of rain: {chance_rain}%
Chance of snow: {closest_hour.get("chance_of_snow", "N/A")}%
Wind speed: {closest_hour.get("wind_kph", "N/A")} kph
UV index: {closest_hour.get("uv", "N/A")}
Cloud cover: {closest_hour.get("cloud", "N/A")}%
Visibility: {closest_hour.get("vis_km", "N/A")} km
--- User Question ---
{query}
--- Final Answer ---
"""
response = llm_gpt4.invoke(summary_prompt)
return response.content.strip() if isinstance(response, AIMessage) else str(response)
except Exception as e:
return f"Weather Agent Error: {e}"
@tool("weather")
def weather_tool(query: str) -> str:
"""Weather Agent: Return current, hourly, or historical weather info using WeatherAPI."""
try:
weather_api_key = os.environ.get("WEATHER_API_KEY")
if not weather_api_key:
return "Weather API key not found. Please set WEATHER_API_KEY env variable."
# Step 1: Extract location
location_prompt = f"""
You are a location extractor. Given a user's query about weather, extract the location mentioned in it.
If not found, return "London".
Examples:
- "Is it gonna rain in Tokyo?" → Tokyo
- "Will it be hot in New York later?" → New York
- "明天下午高雄會不會下雨?" → Kaohsiung
- "How’s the weather?" → London
Query: "{query}"
"""
location_resp = llm_gpt4.invoke(location_prompt)
location = location_resp.content.strip() if isinstance(location_resp, AIMessage) else str(location_resp).strip()
# Step 2: Get timezone and time
target_dt = get_time_tool2(query)
# if isinstance(target_dt, str):
# target_dt = datetime.strptime(target_dt, "%Y-%m-%d %H:%M:%S")
if not isinstance(target_dt, datetime):
return f"Failed to parse the target time from your query. Got: {target_dt}"
tz_str = location_to_timezone(location)
target_dt = target_dt.replace(tzinfo=ZoneInfo(tz_str))
now = datetime.now(ZoneInfo(tz_str)) # 用同一時區的 now 去比較!
# Step 3: Check limits and decide API
if target_dt < now - timedelta(days=7):
return "Only supports up to 7 days of historical data."
elif target_dt > now + timedelta(days=2):
return "Only supports up to 3 days of forecast."
if target_dt < now:
url = f"http://api.weatherapi.com/v1/history.json?key={weather_api_key}&q={location}&dt={target_dt.strftime('%Y-%m-%d')}"
else:
url = f"http://api.weatherapi.com/v1/forecast.json?key={weather_api_key}&q={location}&days=3&aqi=no&alerts=no"
data = requests.get(url).json()
forecast_hours = []
if "forecast" in data:
for day in data["forecast"]["forecastday"]:
for hour in day["hour"]:
forecast_hours.append(hour)
elif "forecastday" in data:
forecast_hours = data["forecastday"][0]["hour"]
else:
return "No forecast data available."
# Step 4: Find closest hour
min_diff = float("inf")
closest_hour = None
for hour_data in forecast_hours:
hour_dt = date_parser.parse(hour_data["time"]).replace(tzinfo=ZoneInfo(tz_str))
diff = abs((hour_dt - target_dt).total_seconds())
if diff < min_diff:
min_diff = diff
closest_hour = hour_data
if not closest_hour:
return f"No hourly data found for {target_dt.strftime('%Y-%m-%d %H:%M')}."
# Step 5: Generate summary
condition = closest_hour["condition"]["text"]
temp = closest_hour["temp_c"]
feels = closest_hour["feelslike_c"]
humidity = closest_hour["humidity"]
chance_rain = closest_hour.get("chance_of_rain", 0)
hour_str = closest_hour["time"].split(" ")[1]
summary_prompt = f"""
You are a helpful weather reasoning assistant.
The user wants to know about the weather conditions at a specific time: {target_dt.strftime('%Y-%m-%d %H:%M')} in {location}.
Use the data below to answer their question. This may refer to the past, present, or future — do not assume it is the current weather.
Based on the following weather data and the user's question, think step-by-step to extract the most relevant information, and give a natural, friendly, and cautious answer in British English.
Avoid being overly confident — never say "Yes, it will..." or "Definitely." Instead, use expressions like:
- "It is very likely that..."
- "There is a high chance of..."
- "Based on the available data, it seems that..."
- "There may be..."
Also, after answering the question, include a short weather summary and a useful suggestion (e.g., bring an umbrella, wear sunscreen, avoid outdoor activities).
**Do not use markdown formatting such as `*`, `**`, or list symbols.**
--- Weather Data ---
Location: {location}
Time: {target_dt.strftime('%Y-%m-%d')} at {hour_str}
Condition: {condition}
Temperature: {temp}°C (Feels like {feels}°C)
Humidity: {humidity}%
Chance of rain: {chance_rain}%
Chance of snow: {closest_hour.get("chance_of_snow", "N/A")}%
Wind speed: {closest_hour.get("wind_kph", "N/A")} kph
UV index: {closest_hour.get("uv", "N/A")}
Cloud cover: {closest_hour.get("cloud", "N/A")}%
Visibility: {closest_hour.get("vis_km", "N/A")} km
--- User Question ---
{query}
--- Final Answer ---
"""
response = llm_gpt4.invoke(summary_prompt)
return response.content.strip() if isinstance(response, AIMessage) else str(response)
except Exception as e:
return f"Weather Agent Error: {e}"
@tool("summarise")
def summarise_tool(query: str) -> str:
"""Summarise: Use document summarisation functionality."""
global session_retriever, session_qa_chain
if session_retriever is None:
return "No document uploaded."
try:
docs = session_retriever.get_relevant_documents(query if query.strip() else "summary")
if not docs:
return "No relevant content found in the document."
summarize_chain = load_summarize_chain(llm_gpt4, chain_type="refine", question_prompt=initial_prompt, refine_prompt=refine_prompt)
summary = summarize_chain.invoke(docs)
return summary['output_text']
except Exception as e:
return f"Summarisation error: {e}"
def _calc_tool(query: str) -> str:
import math
import re
try:
# Handle pure arithmetic expressions (only numbers and symbols)
if re.fullmatch(r"[0-9\.\+\-\*/%\^\(\)\s]+", query.strip()):
cleaned = query.strip().replace("^", "**")
result = ne.evaluate(cleaned)
return f"The result is: {result}"
# For expressions containing sin/cos/log etc., automatically apply math + radians
expr = query.lower()
expr = re.sub(r'sin\(([^)]+)\)', r'sin(math.radians(\1))', expr)
expr = re.sub(r'cos\(([^)]+)\)', r'cos(math.radians(\1))', expr)
expr = re.sub(r'tan\(([^)]+)\)', r'tan(math.radians(\1))', expr)
expr = expr.replace("^", "**")
result = eval(expr, {"__builtins__": None}, {
"math": math, "sin": math.sin, "cos": math.cos, "tan": math.tan,
"log": math.log10, "sqrt": math.sqrt, "exp": math.exp,
"pi": math.pi, "e": math.e
})
return f"The result is: {result}"
except Exception:
try:
# Fallback: ask GPT to calculate and explain briefly in plain English (avoid messy symbols)
response = llm_gpt4.invoke(f"Please calculate this and explain briefly in plain English: {query}. Avoid math symbols like $ or \\n or \\(.")
result = response.content if isinstance(response, AIMessage) else response
result = re.sub(r"\\\[.*?\\\]", "", result) # Remove \[...\]
result = re.sub(r"\\\(.*?\\\)", "", result) # Remove \(...\)
return result.strip()
except Exception as e:
return f"Natural language fallback error: {e}"
@tool("python_calc")
def python_calc_tool(query: str) -> str:
"""Python Calculation: Perform basic arithmetic or logical operations."""
try:
result = ne.evaluate(query)
return str(result)
except Exception as e:
return f"Calculation error: {e}"
def _search_web_tool(query: str) -> str:
return search_web(query)
@tool("search_tool")
def search_tool_func(query: str) -> str:
"""Search: Perform web searches using external search engines."""
return search_web(query)
@tool("uploaded_qa")
def uploaded_qa_tool_func(query: str) -> str:
"""Document QA: Answer questions based on the uploaded document content."""
global session_qa_chain
if session_qa_chain is not None:
try:
return session_qa_chain.run(query)
except Exception as e:
return f"Document QA error: {e}"
else:
return "No document uploaded."
@tool("csv_agent")
def csv_tool_func(query: str) -> str:
"""CSV Agent: Use natural language to analyse uploaded CSV files."""
global csv_dataframe
if csv_dataframe is None:
return "No CSV file uploaded."
try:
agent = create_pandas_dataframe_agent(llm=llm_gpt4, df=csv_dataframe, verbose=True)
return agent.run(f"Here is the table:\n{csv_dataframe.head().to_string(index=False)}\n\n{query}")
except Exception as e:
return f"CSV Agent error: {e}"
# Establish CrewAI agents (for Tab 5 only)
general_agent = Agent(
role="General Assistant",
goal="Respond to any general query that is not related to documents or CSV files.",
backstory="You're an intelligent assistant who answers questions about anything general, such as math, dates, or general knowledge.",
tools=[general_chat_tool],
verbose=True
)
summarizer_agent = Agent(
role="Document Summarizer",
goal="Summarise the content of the uploaded document.",
backstory="You are a professional summarisation expert who can identify key points in long documents.",
tools=[summarise_tool],
verbose=True
)
document_qa_agent = Agent(
role="Document QA Specialist",
goal="Answer questions based on the uploaded document.",
backstory="You are an expert in document understanding and can accurately extract answers.",
tools=[uploaded_qa_tool_func],
verbose=True
)
search_agent = Agent(
role="Search Expert",
goal="Search the web and provide relevant information.",
backstory="You are an expert at finding relevant information from the internet.",
tools=[search_tool_func],
verbose=True
)
time_agent = Agent(
role="Time Assistant",
goal="Answer current time or date related questions across different time zones.",
backstory="You're a time-aware agent who can tell time or date in any major city.",
tools=[time_tool],
verbose=True
)
weather_agent = Agent(
role="Weather Expert",
goal="Answer global weather queries.",
backstory="You are a weather analyst who provides accurate and real-time weather information for any location.",
tools=[weather_tool],
verbose=True
)
math_agent = Agent(
role="Math Assistant",
goal="Perform accurate arithmetic or logical calculations.",
backstory="You are a calculator expert skilled at quick computations.",
tools=[python_calc_tool],
verbose=True
)
csv_agent = Agent(
role="CSV Analyst",
goal="Analyse tabular data and answer questions about the uploaded CSV file.",
backstory="You are skilled in interpreting tabular datasets and can extract numerical or logical insights.",
tools=[csv_tool_func],
verbose=True
)
router_agent = Agent(
role="Query Router",
goal="Determine the most suitable agent or tool to handle the user query.",
backstory="You are an intelligent query dispatcher that analyses the user's intent and chooses the best AI agent to answer.",
tools=[python_calc_tool, search_tool_func, csv_tool_func, uploaded_qa_tool_func, summarise_tool, general_chat_tool, time_tool, weather_tool],
verbose=True
)
router_task = Task(
description="""
Based on the user's query, decide which agent or tool is best suited to handle it:
- If the query is related to the content of an uploaded file (e.g., 'what is this document about?'), send it to the **Document QA Agent**.
- If the query contains words like 'summarize', 'summary', or 'main points', use the **Summarizer Agent**.
- If the query **includes any numbers or symbols** (like +, -, *, /, %, ^), or **mentions math terms** (like 'calculate', 'how much', 'percent', 'square root', 'log', 'cos', 'sin', etc.), or starts with 'what is', 'what’s', 'how much is', assume it is a **math question** and send it to the **Math Agent**.
- If the user uploaded a CSV file and asks about table content, data trends, or uses words like 'data', 'table', 'csv', 'column', or 'row', send it to the **CSV Agent**.
- If the user asks about current events, trending topics, or online information (e.g., 'What is LangChain?', 'latest news'), send it to the **Search Agent**.
- If the query is about current date, time, or day of week (e.g., 'what is today's date?', 'what time is it?', 'what day is it?', '現在幾點', '今天幾號', '禮拜幾'), send it to the **Time Agent**.
- If the query is about weather, rain, temperature, or forecasts (e.g., "What's the weather in Paris?", "Will it rain tomorrow in London?"), send it to the **Weather Agent**.
- If the question is general and not related to documents, calculations, CSVs, or the internet (e.g., 'Who are you?', 'Tell me a fun fact'), send it to the **General Agent**.
- If none of these apply, use your best judgment to choose the most relevant agent.
""",
expected_output="The final answer from the selected agent or tool.",
agent=router_agent,
input_variables=["query"]
)
crew = Crew(
agents=[general_agent, summarizer_agent, document_qa_agent, search_agent, math_agent, time_agent, csv_agent, weather_agent],
tasks=[router_task],
process=Process.sequential,
verbose=True,
llm=crew_llm
)
# test qa
def build_langgraph_doc_qa_chain(llm, retriever, memory, prompt):
def retrieve_step(state):
docs = state['retriever'].get_relevant_documents(state['query'])
return {"docs": docs, **state}
def answer_step(state):
prompt = state["prompt"]
llm = state["llm"]
docs = state["docs"]
llm_chain = LLMChain(llm=llm, prompt=prompt)
doc_chain = StuffDocumentsChain(
llm_chain=llm_chain,
document_variable_name="context"
)
# 只執行一次,並傳入所有需要的參數
answer = doc_chain.run({
"input_documents": docs,
"question": state["query"]
})
return {"answer": answer, **state}
builder = StateGraph(dict)
builder.add_node("Retrieve", retrieve_step)
builder.add_node("Answer", answer_step)
builder.set_entry_point("Retrieve")
builder.add_edge("Retrieve", "Answer")
builder.set_finish_point("Answer")
compiled = builder.compile()
def run(query):
return compiled.invoke({
"query": query,
"retriever": retriever,
"llm": llm,
"prompt": prompt
})
return run
@traceable(name="Multi-Agent Chat")
def multi_agent_chat_advanced(query: str, file=None) -> str:
global session_retriever, session_qa_chain, csv_dataframe
# Smart routing without needing uploaded files
lower_query = query.lower()
math_keywords = ["how much", "calculate", "what is", "what’s", "%", "sin", "cos", "log", "sqrt", "^", "*", "/", "+", "-", "="]
if any(k in lower_query for k in math_keywords):
return _calc_tool(query)
date_keywords = ["what date", "today", "what time", "what day", "current time", "date", "現在幾點", "今天幾號", "禮拜幾"]
if any(k in lower_query for k in date_keywords):
return get_time_tool(query)
weather_keywords = ["weather", "rain", "snow", "cold", "hot", "sunscreen", "sunglasses", "umbrella", "windy", "cloudy", "sunny", "temperature", "forecast", "天氣", "會不會下雨", "冷嗎", "熱嗎", "氣溫"]
if any(k in lower_query for k in weather_keywords):
return weather_agent_tool(query)
search_keywords = ["latest", "news", "startup", "startups", "company", "companies", "top", "trending", "in 2025", "in 2024", "tell me"]
if any(k in lower_query for k in search_keywords):
return search_web(query)
general_keywords = ["who are you", "what is your name", "what can you do", "fun fact"]
if any(k in lower_query for k in general_keywords):
return _general_chat(query)
# Check if file exists and determine its format
file_path = get_file_path(file) if file is not None else None
# Determine if the query should be processed as document-related
non_doc_keywords = ["calculate", "sum", "date", "time", "how many", "how much", "weather", "temperature"]
use_file_chain = not any(kw in query.lower() for kw in non_doc_keywords)
# Step 3: If a file is uploaded
if file_path:
file_lower = file_path.lower()
# Process CSV
if file_lower.endswith(".csv"):
try:
with open(file_path, 'rb') as f:
result = chardet.detect(f.read())
encoding = result['encoding']
df = pd.read_csv(file_path, encoding=encoding)
csv_dataframe = df # Ensure global assignment
# If query mentions file, add context
if "file" in query.lower() or "upload" in query.lower():
query = f"The user uploaded the following CSV file:\n\n{query}"
result = crew.kickoff(inputs={"query": query})
return safe_format_result(result)
except Exception as e:
return f"CSV Parsing Error: {e}"
# 3-2: Process PDF / DOCX / TXT
elif file_lower.endswith((".pdf", ".txt", ".docx")):
try:
loader = (
PyPDFLoader(file_path) if file_lower.endswith(".pdf")
else UnstructuredWordDocumentLoader(file_path) if file_lower.endswith(".docx")
else TextLoader(file_path)
)
docs = loader.load()
chunks = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50).split_documents(docs)
db = FAISS.from_documents(chunks, embeddings)
session_retriever = db.as_retriever()
session_qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm_gpt4,
retriever=session_retriever,
memory=ConversationBufferMemory(memory_key="chat_history", return_messages=True),
)
# If the query is summary-related, use Summarize Chain
if any(kw in query.lower() for kw in ["summarize", "summary", "summarise", "summarisation", "summarization", "摘要", "總結"]):
return document_summarize(file_path)
# If using QA Chain is appropriate
if use_file_chain:
try:
answer = session_qa_chain.run(query)
#session_graph_chain = build_langgraph_doc_qa_chain(llm_gpt4, session_retriever, memory, custom_prompt)
#answer = session_graph_chain(query)["answer"]
# ✅ DeepEval 評估僅在 Tab1 文件 QA 的情況下觸發
if SHOW_EVAL:
try:
test_case = LLMTestCase(
input=query,
actual_output=answer,
expected_output=answer,
context=[d.page_content for d in session_retriever.get_relevant_documents(query)[:3]]
)
metric = AnswerRelevancyMetric(model="gpt-4o-mini")
results = evaluate([test_case], [metric])
result = results[0]
print(f"[DeepEval Tab1] Input: {query}")
print(f"[DeepEval Tab1] Passed: {result.passed}, Score: {result.score:.2f}, Reason: {result.reason}")
except Exception as e:
print(f"[DeepEval Tab1] Evaluation failed: {e}")
return answer
except Exception as e:
return f"Document QA Error: {e}"
# Otherwise, proceed with Multi-Agent reasoning
if "file" in query.lower() or "upload" in query.lower():
query = f"The user uploaded the following document:\n\n{query}"
result = crew.kickoff(inputs={"query": query})
return safe_format_result(result)
except Exception as e:
return f"Document Processing Error: {e}"
else:
return "Unsupported file format."
# Step 4: If no file is uploaded, directly use CrewAI reasoning
try:
result = crew.kickoff(inputs={"query": query})
return safe_format_result(result)
except Exception as e:
return f"Multi-Agent Error: {e}"
# LangGraph 使用的節點函數(會接續你的 Crew Agent)
# 初始化 embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Intent Embedding 分類(支援檔名)
INTENT_LABELS = {
"DocQA": ["document", "file", "paper", "cb", "proposal", "project"],
"Summarise": ["summarise", "summary", "abstract", "key points", "overview", "main points"],
"General": ["who are you", "tell me something", "what can you do", "fun fact"],
}
def parse_query(query: str) -> dict:
prompt = """Analyze the following query and determine required subtasks. Return a JSON object containing:
- summarize_files: list of document indices to summarize
- qa_pairs: list of QA objects [{"question": "question", "doc_indices": [relevant doc indices]}]
- compare_files: list of document index pairs to compare [[doc1_idx, doc2_idx]]
- find_relations: boolean, whether to analyze document relationships
For example, query "What are the differences between document A and B, and summarize A" should return:
{
"summarize_files": [0],
"qa_pairs": [],
"compare_files": [[0, 1]],
"find_relations": false
}
Query: """ + query
response = llm_gpt4.invoke(prompt)
try:
return json.loads(response.content)
except:
return {
"summarize_files": [],
"qa_pairs": [{"question": query, "doc_indices": [0]}],
"compare_files": [],
"find_relations": False
}
def autogen_multi_document_analysis(query: str, docs: list, file_names: list) -> str:
try:
# 建立絕對路徑的暫存目錄,並確保它存在
import tempfile
import os
# 建立一個臨時工作目錄
temp_dir = tempfile.mkdtemp(dir="/tmp")
os.environ["OPENAI_CACHE_DIR"] = temp_dir
# 設置 AutoGen 的工作目錄
os.environ["AUTOGEN_CACHE_PATH"] = temp_dir
os.environ["AUTOGEN_CACHEDIR"] = temp_dir
os.environ["OPENAI_CACHE_PATH"] = temp_dir
# 強制 AutoGen 使用我們的臨時目錄而不是 ./.cache
import autogen
if hasattr(autogen, "set_cache_dir"):
autogen.set_cache_dir(temp_dir)
# 準備文件上下文
context = "\n\n".join(
f"Document {name}:\n{doc[:2000]}..."
for name, doc in zip(file_names, docs)
)
# 配置 LLM
config_list = [{
"model": "gpt-4o-mini",
"api_key": openai_api_key
}]
# 基礎配置 - 不包含任何緩存相關參數
llm_config = {
"config_list": config_list,
"temperature": 0
}
# 在進行 AutoGen 處理前,切換到臨時目錄
original_dir = os.getcwd()
os.chdir(temp_dir)
try:
# 以下是您的 AutoGen 處理代碼
user_proxy = UserProxyAgent(
name="User",
system_message="A user seeking information from multiple documents.",
human_input_mode="NEVER",
code_execution_config={"use_docker": False},
llm_config=llm_config
)
# 定義文檔分析專家
doc_analyzer = AssistantAgent(
name="DocumentAnalyzer",
system_message="""You are an expert at analyzing and comparing documents. Focus on:
1. Key similarities and differences
2. Main themes and topics
3. Relationships between documents
4. Evidence-based analysis""",
llm_config=llm_config
)
# 定義問答專家
qa_expert = AssistantAgent(
name="QAExpert",
system_message="""You are an expert at extracting specific information. Focus on:
1. Finding relevant details
2. Answering specific questions
3. Cross-referencing information
4. Providing evidence""",
llm_config=llm_config
)
# 定義總結專家
summarizer = AssistantAgent(
name="Summarizer",
system_message="""You are an expert at summarizing content. Focus on:
1. Key points and findings
2. Important relationships
3. Critical conclusions
4. Comprehensive overview""",
llm_config=llm_config
)
# 創建群組聊天
groupchat = GroupChat(
agents=[user_proxy, doc_analyzer, qa_expert, summarizer],
messages=[],
max_round=5
)
# 創建管理器
manager = GroupChatManager(
groupchat=groupchat,
llm_config=llm_config
)
# 準備任務提示
task_prompt = f"""Analyze these documents and answer the query:
Query: {query}
Documents Context:
{context}
Requirements:
1. Provide a direct and clear answer
2. Support all claims with evidence from the documents
3. Consider relationships between all documents
4. If comparing, analyze all relevant aspects
5. If summarizing, cover all important points
6. If looking for specific content, search thoroughly
7. If analyzing relationships, consider all connections
Please provide a comprehensive and well-structured answer."""
# 執行群組討論
user_proxy.initiate_chat(manager, message=task_prompt)
return user_proxy.last_message()["content"]
finally:
# 完成後,切回原始目錄
os.chdir(original_dir)
return result
except Exception as e:
print(f"ERROR in AutoGen processing: {str(e)}")
return f"Error analyzing documents: {str(e)}"
# === AutoGen 多代理人協作邏輯 ===
def detect_intent_embedding(query, file_names=[]):
query_emb = embedding_model.encode(query, normalize_embeddings=True)
best_label = None
best_score = -1
all_phrases = INTENT_LABELS.copy()
if file_names:
all_phrases["DocQA"] += [name.lower() for name in file_names]
for label, examples in all_phrases.items():
for example in examples:
example_emb = embedding_model.encode(example, normalize_embeddings=True)
score = float(query_emb @ example_emb.T)
if score > best_score:
best_score = score
best_label = label
return best_label if best_label else "General"
def decide_next(state):
query = state.get("query", "")
file_names = state.get("file_names", [])
label = detect_intent_embedding(query, file_names)
return label
# === 定義 Task 物件 ===
docqa_task = Task(
description="Document QA Task: Answer questions based on the uploaded document.",
expected_output="Answer from Document QA Agent.",
agent=document_qa_agent,
input_variables=["query"]
)
general_task = Task(
description="General Chat Task: Answer general queries.",
expected_output="Answer from General Agent.",
agent=general_agent,
input_variables=["query"]
)
summariser_task = Task(
description="Summarisation Task: Summarise document content.",
expected_output="Summary output.",
agent=summarizer_agent, # 注意此處名稱須與定義一致(使用字母 z)
input_variables=["query"]
)
search_task = Task(
description="Search Task: Retrieve information from the web.",
expected_output="Answer from Search Agent.",
agent=search_agent,
input_variables=["query"]
)
# === LangGraph 節點函數 ===
def general_run(state):
"""改用直接 LLM 回答取代 General Agent"""
try:
prompt = f"""You are a helpful AI assistant. Please answer the following question:
{state["query"]}
Provide a clear and informative answer."""
response = llm_gpt4.invoke(prompt)
answer = response.content if hasattr(response, 'content') else str(response)
return {"answer": answer}
except Exception as e:
print(f"ERROR in general_run: {str(e)}")
return {"answer": "I apologize, but I'm having trouble processing your request."}
def docqa_run(state):
"""文件問答處理"""
try:
# 如果有檢索器,使用檢索器
if "retriever" in state:
relevant_docs = state["retriever"].get_relevant_documents(state["query"])
context = "\n".join(d.page_content for d in relevant_docs)
else:
context = "\n".join(state["docs"])
prompt = f"""Based on the following context, please answer the question:
Question: {state["query"]}
Context:
{context[:3000]}
Provide a detailed and accurate answer based on the context."""
response = llm_gpt4.invoke(prompt)
return {"answer": response.content if hasattr(response, 'content') else str(response)}
except Exception as e:
print(f"ERROR in docqa_run: {str(e)}")
return general_run(state)
def summariser_run(state):
"""文件摘要處理"""
try:
context = "\n".join(state["docs"])
prompt = f"""Please provide a comprehensive summary of the following document:
{context[:3000]}
Focus on:
1. Main topics and key points
2. Important findings or conclusions
3. Significant details"""
response = llm_gpt4.invoke(prompt)
return {"summary": response.content if hasattr(response, 'content') else str(response)}
except Exception as e:
print(f"ERROR in summariser_run: {str(e)}")
return {"summary": "Error generating summary."}
# === LangGraph 定義 ===
def build_langgraph_pipeline():
graph = StateGraph(dict)
graph.add_node("Router", lambda state: state) # Router 僅傳遞狀態
graph.add_node("DocQA", docqa_run)
graph.add_node("General", general_run)
graph.add_node("Summarise", summariser_run)
graph.set_entry_point("Router")
graph.add_conditional_edges("Router", decide_next, {
"DocQA": "DocQA",
"General": "General",
"Summarise": "Summarise",
})
graph.set_finish_point("DocQA")
graph.set_finish_point("General")
graph.set_finish_point("Summarise")
return graph.compile()
from tempfile import mkdtemp
def get_file_path_tab6(file):
if isinstance(file, str):
print("DEBUG: File is a string:", file)
if os.path.exists(file):
print("DEBUG: File exists:", file)
return file
else:
print("DEBUG: File does not exist:", file)
return None
elif isinstance(file, dict):
print("DEBUG: File is a dict:", file)
data = file.get("data")
name = file.get("name")
print("DEBUG: Data:", data, "Name:", name)
if data:
if isinstance(data, str) and os.path.exists(data):
print("DEBUG: Data is a valid file path:", data)
return data
else:
temp_dir = mkdtemp()
file_path = os.path.join(temp_dir, name if name else "uploaded_file")
print("DEBUG: Writing data to temporary file:", file_path)
with open(file_path, "wb") as f:
if isinstance(data, str):
f.write(data.encode("utf-8"))
else:
f.write(data)
if os.path.exists(file_path):
print("DEBUG: Temporary file created:", file_path)
return file_path
else:
print("ERROR: Temporary file not created:", file_path)
return None
else:
print("DEBUG: No data in dict, returning None")
return None
elif hasattr(file, "save"):
print("DEBUG: File has save attribute")
temp_dir = mkdtemp()
file_path = os.path.join(temp_dir, file.name)
file.save(file_path)
if os.path.exists(file_path):
print("DEBUG: File saved to:", file_path)
return file_path
else:
print("ERROR: File not saved properly:", file_path)
return None
else:
print("DEBUG: File type unrecognized")
if hasattr(file, "name"):
if os.path.exists(file.name):
return file.name
return None
def langgraph_tab6_main(query: str, file=None):
try:
print(f"DEBUG: Starting processing with query: {query}")
# 如果沒有文件,直接使用 general_run
if not file:
return general_run({"query": query})["answer"]
# 處理文件列表
files = file if isinstance(file, list) else [file]
all_docs = []
file_names = []
docs_by_file = []
# 處理上傳的文件
for f in files:
try:
path = get_file_path_tab6(f)
if not path:
continue
file_names.append(os.path.basename(path))
# 根據文件類型選擇加載器
if path.lower().endswith('.pdf'):
loader = PyPDFLoader(path)
elif path.lower().endswith('.docx'):
loader = UnstructuredWordDocumentLoader(path)
else:
loader = TextLoader(path)
docs = loader.load()
if docs:
text = "\n".join(doc.page_content for doc in docs if hasattr(doc, 'page_content'))
docs_by_file.append(text)
all_docs.extend(docs)
except Exception as e:
print(f"ERROR processing file: {str(e)}")
continue
if not docs_by_file:
return general_run({"query": query})["answer"]
# 建立檢索器
try:
chunks = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
).split_documents(all_docs)
db = FAISS.from_documents(chunks, embeddings)
retriever = db.as_retriever(search_kwargs={"k": 5})
global session_retriever, session_qa_chain
session_retriever = retriever
session_qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm_gpt4,
retriever=retriever,
memory=ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
),
)
except Exception as e:
print(f"ERROR setting up retriever: {str(e)}")
retriever = None
# 檢測是否為多文件查詢
# 檢測是否為多文件查詢或複雜查詢
if len(docs_by_file) > 1 or "compare" in query.lower() or "relation" in query.lower():
return autogen_multi_document_analysis(query, docs_by_file, file_names)
# 使用 LangGraph 處理單文件查詢
state = {
"query": query,
"file_names": file_names,
"docs": docs_by_file,
"retriever": retriever
}
# 根據查詢意圖選擇處理方式
intent = detect_intent_embedding(query, file_names)
if intent == "Summarise":
return summariser_run(state)["summary"]
elif intent == "DocQA":
return docqa_run(state)["answer"]
else:
return general_run(state)["answer"]
except Exception as e:
print(f"ERROR in main function: {str(e)}")
return f"I apologize, but I encountered an error: {str(e)}"
# Gradio Interface Settings
demo_description = """
**Context**:
This demo uses a **Retrieval-Augmented Generation (RAG)** system based on
Biden’s 2023 State of the Union Address.
All responses are grounded in this document.
If no relevant information is found in the document, the system will say "No relevant info found."
**Sample Questions**:
1. What were the main topics regarding infrastructure in this speech?
2. How does the speech address the competition with China?
3. What does Biden say about job growth in the past two years?
4. Does the speech mention anything about Social Security or Medicare?
5. What does the speech propose regarding Big Tech or online privacy?
*Note: The LLaMA module generates responses based solely on the current query without follow-up memory or chat history management.*
> This is a CPU-only demo running a **quantised 1B LLaMA model**, built to show that full RAG + multi-agent systems can run even without a GPU. In production, the model can be replaced with larger ones (3B, 7B, etc.) and served using vLLM, 4-bit quantisation, or TensorRT for better speed. The design focuses on portability, deployment, and modularity.
Feel free to ask any question related to Biden’s 2023 State of the Union Address.
"""
demo_description2 = """
**Context**:
This demo uses a **Retrieval-Augmented Generation (RAG)** system based on
Biden’s 2023 State of the Union Address.
All responses are grounded in this document.
If no relevant information is found in the document, the system will say "No relevant info found."
**Sample Questions**:
1. What were the main topics regarding infrastructure in this speech?
2. How does the speech address the competition with China?
3. What does Biden say about job growth in the past two years?
4. Does the speech mention anything about Social Security or Medicare?
5. What does the speech propose regarding Big Tech or online privacy?
*Note: The GPT module supports follow-up questions with conversation history management, enabling more interactive and context-aware discussions.*
Feel free to ask any question related to Biden’s 2023 State of the Union Address.
"""
demo_description3 = """
**Context**:
Upload a PDF, TXT, or DOCX file and ask a question about its content.
This demo uses **GPT-4o-Mini** to answer questions based on the content of your uploaded document.
Note: This is a **strict RAG-based QA** system. It will only answer questions if the answer is explicitly found in the uploaded document.
For more flexible or general-purpose responses, please try Tab 1 (Multi-Agent Assistant).
Typical Use Cases:
- Legal, technical, or academic documents where factual precision is critical
- Internal company reports where hallucination must be avoided
- Medical papers where only referenced content should be discussed
Feel free to ask any question directly related to your document.
"""
demo_description4 = """
**Context**:
This demo uses a **refinement-based document summarisation chain**.
Upload a PDF, TXT, or DOCX file to get a concise, structured summary of its contents.
"""
demo_description5 = """
**Context**:
This demo presents a GPT-style Multi-Agent AI Assistant, built with **LangChain, CrewAI**, and **RAG (Retrieval-Augmented Generation)**. The system automatically understands your intent and routes the query to the best expert agent, enabling dynamic **multi-agent orchestration**.
**Supported features**:
- 📄 **Document Summarisation** (PDF, DOCX, TXT)
- ❓ **FAQ-style Q&A based on uploaded documents** (RAG-based)
- 🌐 **Live Web Search** (Online RAG + GPT post-processing summary)
- 📅 **Real-time Worldwide Date & Time** (LLM + GeoLocator + TimezoneFinder, supports any city globally)
- 🌦️ **Global Weather** (LLM Time Reasoning + Timezone + Few-Shot, supports fuzzy queries, 3-day forecast, 7-day history, hourly precision)
- ➗ **Math and Logic Calculations** (from scientific equations to financial or tax-related use cases)
- 💬 **General Chatting / Reasoning**
**Sample Questions**:
1. Summarise the document. *(→ Summarisation Agent)*
2. What are the key ideas mentioned in this file? *(→ RAG QA Agent)*
3. What is LangChain used for? | What are the latest trends in AI startups in 2025? | Tell me the most recent breakthrough in quantum computing. *(→ Online Rag Agent)*
4. What's the current time in London? | What’s today’s date in New York? | What time is it in Taipei right now? *(→ Time Agent)*
5. Will it rain or snow in Sapporo tomorrow night? | Is it too windy for cycling in Amsterdam at 6am? | Do I need to bring an umbrella later this evening in Edinburgh? | Should I wear sunscreen in Bangkok around noon tomorrow? | Is it gonna rain later? | What was the weather like in Paris on last Sunday? | Will the weather be suitable for hiking at 3pm in Lake District? *(→ Weather Agent)*
6. If I earn $15 per hour and work 8 hours a day for 5 days, how much will I earn? | What is 5 * 22.5 / sin(45) | 3^3 + 4^2 | Calculate 25 * log(1000) | What is the square root of 144 *(→ Math Agent)*
7. Who are you? | What can you do? | What is the meaning of life? *(→ General Chat Agent)*
Feel free to upload a document and ask related questions, or just type a question directly—no file upload required. *Note: CSV file analysis and auto visualisation is coming soon.*
"""
demo_description6 = """
**Context**:
This is a **smart multi-document reasoning assistant**, powered by **LangGraph**, **CrewAI**, and **AutoGen**.
Upload zero to multiple files and ask anything — the system will uses **embedding-based intent detection** to decide whether to summarise, extract, compare, or analyse relationships.
For complex multi-file tasks, it triggers a **collaborative AutoGen team** to deeply reason across documents and generate contextual, evidence-based answers.
**Supported Features**:
- 📄 Multi-document support (PDF, DOCX, TXT)
- 🔍 Embedding-based intent detection and semantic routing
- 🤖 Agents: Summariser, QA Agent, General Agent, Search Agent
- 🔀 Orchestrated by LangGraph + AutoGen (fallbacks + task handoff)
- 🧠 AutoGen multi-agent collaboration for cross-file reasoning
- 🌐 Online search fallback if all the other agent can't handle tasks
**Sample Questions**:
1. Who are you? | What is GPT4? *(→ General Chat Agent)*
2. Summarise the document/file/your_doc_name. *(→ Summarisation Agent)*
3. What is LangChain used for? | What are the latest trends in AI startups in 2025? | Tell me the most recent breakthrough in quantum computing. *(→ Online Rag Agent)*
4. What's the title in the document? | What are the key ideas mentioned in this file? *(→ RAG QA Agent)*
5. Compare the proposals in DocA and DocB. | Summarise all files. | Is DocA one of the project in the DocB or DocC. | Which argument is stronger across these files? | Do these documents mention similar policies? | What's the difference between the files? *(→ AutoGen)*
6. What is LangChain used for? | What are the latest trends in AI startups in 2025? | Tell me the most recent breakthrough in quantum computing. *(→ Online Rag Agent)*
> Built for users who need clear, explainable, and context-aware answers — whether you’re working on documents in law, finance, research, or tech.
"""
demo = gr.TabbedInterface(
interface_list=[
gr.Interface(
fn=langgraph_tab6_main,
inputs=[
gr.Textbox(label="Ask anything"),
gr.File(label="Upload one or more files", file_types=[".pdf", ".txt", ".docx"], file_count="multiple")
],
outputs="text",
title="Smart Multi-Doc QA (LangGraph + AutoGen)",
allow_flagging="never",
description=demo_description6
),
gr.Interface(
fn=multi_agent_chat_advanced,
inputs=[
gr.Textbox(label="Enter your query"),
gr.File(label="Upload file (CSV, PDF, TXT, DOCX)", file_types=[".pdf", ".txt", ".docx"], file_count="single")
],
outputs="text",
title="Multi-Agent AI Assistant",
allow_flagging="never",
description=demo_description5
),
gr.Interface(
fn=document_summarize,
inputs=[gr.File(label="Upload PDF, TXT, or DOCX", file_types=[".pdf", ".txt", ".docx"])],
outputs="text",
title="Document Summarisation",
allow_flagging="never",
description=demo_description4
),
gr.Interface(
fn=upload_and_chat,
inputs=[gr.File(label="Upload PDF, TXT, or DOCX", file_types=[".pdf", ".txt", ".docx"]), gr.Textbox(label="Ask a question")],
outputs="text",
title="Your Docs Q&A (Upload + GPT-4 RAG)",
allow_flagging="never",
description=demo_description3
),
gr.Interface(
fn=rag_gpt4_qa,
inputs="text",
outputs="text",
title="Biden Q&A (GPT-4 RAG)",
allow_flagging="never",
description=demo_description2
),
gr.Interface(
fn=rag_llama_qa,
inputs="text",
outputs="text",
title="Biden Q&A (LLaMA RAG)",
allow_flagging="never",
description=demo_description
),
],
tab_names=[
"Multi-Doc QA",
"Multi-Agent AI Assistant",
"Document Summarisation",
"Your Docs Q&A (Upload + GPT-4 RAG)",
"Biden Q&A (GPT-4 RAG)",
"Biden Q&A (LLaMA RAG)",
],
title="Smart RAG + Multi-Agent Assistant (with Web + Document AI)"
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, share=False)