Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
# 財政部財政資訊中心 江信宗 | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
import gradio as gr | |
from openai import OpenAI | |
from langchain_community.utils import user_agent | |
from langchain_groq import ChatGroq | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain.chains import RetrievalQA | |
from langchain_community.document_loaders import WebBaseLoader, TextLoader | |
from langchain.prompts import PromptTemplate | |
from langchain.schema import Document | |
import resend | |
import requests | |
import re | |
import time | |
def load_documents(sources): | |
documents = [] | |
for source in sources: | |
try: | |
if isinstance(source, str): | |
if source.startswith('http'): | |
loader = WebBaseLoader(source) | |
else: | |
loader = TextLoader(source) | |
documents.extend(loader.load()) | |
elif isinstance(source, dict): | |
documents.append(Document(page_content=source['content'], metadata=source.get('metadata', {}))) | |
except Exception as e: | |
print(f"Error loading source {source}: {str(e)}") | |
return documents | |
sources = [ | |
"TaxQADataSet_Slim1.txt", | |
"TaxQADataSet_Slim2.txt", | |
"TaxQADataSet_Slim3.txt", | |
"TaxQADataSet_Slim4.txt", | |
"TaxQADataSet_Slim5.txt", | |
"TaxQADataSet_Slim6.txt", | |
"TaxQADataSet_ntpc1.txt", | |
"TaxQADataSet_ntpc2.txt", | |
"TaxQADataSet_kctax.txt", | |
"TaxQADataSet_chutax.txt", | |
"LandTaxAct1100623.txt", | |
"TheEnforcementRulesoftheLandTaxAct1100923.txt", | |
"HouseTaxAct1130103.txt", | |
"VehicleLicenseTaxAct1101230.txt", | |
"TaxCollectionAct1101217.txt", | |
"AmusementTaxAct960523.txt", | |
"StampTaxAct910515.txt", | |
"DeedTaxAct990505.txt" | |
] | |
documents = load_documents(sources) | |
print(f"\n成功載入 {len(documents)} 個檔案") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=512, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
separators=["\n\n\n","\n\n", "\n", "。"] | |
) | |
split_docs = text_splitter.split_documents(documents) | |
print(f"分割後的文件數量:{len(split_docs)}") | |
embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
print(f"\n成功初始化 Microsoft 嵌入模型") | |
print(f"\n開始建立向量資料庫") | |
vectorstore = Chroma.from_documents(split_docs, embeddings, persist_directory="./Knowledge-base") | |
print(f"成功建立 Chroma 向量資料庫,共有 {len(split_docs)} 個文檔") | |
retriever = vectorstore.as_retriever( | |
search_type="mmr", | |
search_kwargs={ | |
"k": min(4, len(split_docs)), | |
"fetch_k": min(20, len(split_docs)), | |
"lambda_mult": 0.8 | |
} | |
) | |
print(f"檢索演算法:Maximum Marginal Relevance Retrieval") | |
print(f"檢索文檔數量:k={min(4, len(split_docs))}, fetch_k={min(20, len(split_docs))}") | |
template = """Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in Taiwanese Traditional Chinese. | |
在回答之前,請仔細分析檢索到的上下文,確保你的回答準確完整反映了上下文中的訊息,而不是依賴先前的知識,在回應的答案中絕對不要提到是根據上下文回答。 | |
如果檢索到的多個上下文之間存在聯繫,請整合這些訊息以提供更全面的回答,但要避免過度推斷。 | |
如果檢索到的上下文不包含足夠回答問題的訊息,請誠實的說明,不要試圖編造答案。 | |
上下文: {context} | |
問題: {question} | |
答案:""" | |
PROMPT = PromptTemplate( | |
template=template, input_variables=["context", "question"] | |
) | |
def create_chain(llm): | |
return RetrievalQA.from_chain_type( | |
llm=llm, | |
chain_type="stuff", | |
retriever=retriever, | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": PROMPT} | |
) | |
print(f"成功建立 RAG Chain") | |
def initialize_llm(api_key): | |
return ChatGroq( | |
groq_api_key=api_key, | |
model_name='llama-3.3-70b-versatile' | |
) | |
def generate_insight_questions(query, api_key): | |
llm = initialize_llm(api_key) | |
prompt = f"""Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in "Traditional Chinese". | |
根據以下回答,生成3個相關的洞察問題: | |
原始問題: {query} | |
請提供3個簡短但有深度的問題,這些問題應該符合: | |
1. 與原始問題緊密相關 | |
2. 準確重新描述原始問題 | |
3. 引導更深入的解決原始問題 | |
請直接列出這3個問題,每個問題一行,不要添加編號或其他文字。 | |
""" | |
try: | |
response = llm.invoke(prompt) | |
if hasattr(response, 'content'): | |
questions = response.content.split('\n') | |
else: | |
questions = str(response).split('\n') | |
while len(questions) < 3: | |
questions.append("提供更多地方稅資訊") | |
return questions[:3] | |
except Exception as e: | |
print(f"Error generating insight questions:{str(e)}") | |
return ["提供更多地方稅資訊", "提供其他地方稅問題", "還想了解什麼地方稅目"] | |
def answer_question(query, api_key): | |
try: | |
gr.Info("檢索地方稅知識庫中......") | |
llm = initialize_llm(api_key) | |
chain = create_chain(llm) | |
result = chain.invoke({"query": query}) | |
answer = result["result"] | |
insight_questions = generate_insight_questions(query, api_key) | |
while len(insight_questions) < 3: | |
insight_questions.append("提供更多地方稅資訊") | |
return answer, insight_questions[:3] | |
except Exception as e: | |
return f"抱歉,處理您的問題時發生錯誤:{str(e)}", [] | |
def split_questions(query): | |
questions = re.split(r'[?!。 ]', query) | |
return [q.strip() for q in questions if q.strip()] | |
def answer_multiple_questions(query, api_key): | |
questions = split_questions(query) | |
all_answers = [] | |
all_insight_questions = [] | |
for question in questions: | |
answer, insight_questions = answer_question(question, api_key) | |
if len(questions) > 1: | |
all_answers.append(f"【問題】{question}\n答案:{answer}") | |
else: | |
all_answers.append(answer) | |
all_insight_questions.extend(insight_questions) | |
if len(questions) > 1: | |
combined_answer = "\n\n\n".join(all_answers) | |
else: | |
combined_answer = "\n".join(all_answers) | |
selected_insight_questions = all_insight_questions[:3] | |
return combined_answer, selected_insight_questions | |
def get_tax_law(tax_type): | |
tax_law_dict = { | |
"房屋稅": "房屋稅條例", | |
"地價稅": "土地稅法", | |
"土地增值稅": "土地稅法", | |
"增值稅": "土地稅法", | |
"契稅": "契稅條例", | |
"娛樂稅": "娛樂稅法", | |
"印花稅": "印花稅法", | |
"使用牌照稅": "使用牌照稅法", | |
"牌照稅": "使用牌照稅法", | |
"稅捐稽徵法": "稅捐稽徵法", | |
"綜合所得稅": "所得稅法", | |
"所得稅": "所得稅法", | |
"遺產稅": "遺產及贈與稅法", | |
"贈與稅": "遺產及贈與稅法", | |
"營業稅": "營業稅法" | |
} | |
return tax_law_dict.get(tax_type, "無稅法") | |
def fetch_law_summary(query, tax_law, keywords): | |
url = "https://ttc.mof.gov.tw/Api/GetData" | |
headers = { | |
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", | |
"accept": "application/json, text/javascript, */*; q=0.01", | |
"accept-encoding": "gzip, deflate, br, zstd", | |
"accept-language": "zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
"referer": "https://ttc.mof.gov.tw/" | |
} | |
gr.Info("檢索法令彙編函釋中......") | |
version_payload = { | |
"FunctionID": "FB10001", | |
"ObjParams[TaxAct]": tax_law, | |
"ObjParams[TaxVer]": "請選擇", | |
"ObjParams[Chapter]": "請選擇", | |
"ObjParams[Article]": "請選擇", | |
"ObjParams[Content]": "", | |
"ObjParams[Operator01]": "0", | |
"ObjParams[Content01]": "", | |
"ObjParams[Operator02]": "0", | |
"ObjParams[Content02]": "" | |
} | |
try: | |
version_response = requests.post(url, data=version_payload, headers=headers) | |
version_response.raise_for_status() | |
version_data = version_response.json() | |
if version_data["Code"] == "1" and "Table1" in version_data["Data"]: | |
latest_version = "請選擇" | |
for item in version_data["Data"]["Table1"]: | |
if item["TaxAct"] == tax_law: | |
latest_version = item["TaxVer"] | |
break | |
if latest_version == "請選擇": | |
print(f"未找到 {tax_law} 的對應版本,使用預設選項。") | |
else: | |
gr.Warning("無法獲取稅法版本資訊,使用預設選項。") | |
latest_version = "請選擇" | |
except Exception as e: | |
print(f"獲取稅法版本時發生錯誤:{str(e)}") | |
latest_version = "請選擇" | |
all_results = [] | |
for keyword in keywords: | |
payload = { | |
"FunctionID": "FB10001", | |
"ObjParams[TaxAct]": tax_law, | |
"ObjParams[TaxVer]": latest_version, | |
"ObjParams[Chapter]": "請選擇", | |
"ObjParams[Article]": "請選擇", | |
"ObjParams[Content]": keyword, | |
"ObjParams[Operator01]": "0", | |
"ObjParams[Content01]": "", | |
"ObjParams[Operator02]": "0", | |
"ObjParams[Content02]": "" | |
} | |
try: | |
response = requests.post(url, data=payload, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
if data["Code"] == "1" and "Table" in data["Data"]: | |
all_results.extend(data["Data"]["Table"]) | |
except Exception as e: | |
print(f"檢索關鍵字 '{keyword}' 的法令彙編函釋時發生錯誤:{str(e)}") | |
if all_results: | |
summary = f"<h3>相關法令彙編函釋檢索結果({tax_law} {latest_version}):</h3>" | |
unique_results = {} | |
for result in all_results: | |
tax_sn = result.get('TaxSN', '') | |
if tax_sn not in unique_results: | |
unique_results[tax_sn] = result | |
for index, result in enumerate(list(unique_results.values())[:20]): # 限制為前20個唯一結果 | |
client = OpenAI( | |
api_key=os.environ.get("YOUR_API_TOKEN"), | |
base_url="https://api.sambanova.ai/v1", | |
) | |
system_prompt = f""" | |
請判斷以下函釋內容與user提問的內容。 | |
請給出一個0到100之間的相關性百分比,不要任何說明或理由。 | |
回答格式為: | |
相關性:XX% | |
函釋內容:```{result['Content']}``` | |
""" | |
prompt = f"""```{query}```""" | |
max_retries = 2 | |
retry_delay = 4 | |
for attempt in range(max_retries): | |
try: | |
response = client.chat.completions.create( | |
model="Meta-Llama-3.1-405B-Instruct", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7 | |
) | |
relevance_percentage = response.choices[0].message.content.strip() | |
break | |
except Exception as e: | |
if (attempt == 0) or (attempt == max_retries - 1): | |
try: | |
response = client.chat.completions.create( | |
model="Meta-Llama-3.1-70B-Instruct", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7 | |
) | |
relevance_percentage = response.choices[0].message.content.strip() | |
break | |
except Exception as e2: | |
relevance_percentage = "相關性:0%" | |
break | |
else: | |
print(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
try: | |
percentage = int(relevance_percentage.split(":")[1].strip().rstrip('%')) | |
except ValueError: | |
print(f"Warning: Could not parse relevance percentage from '{relevance_percentage}'") | |
percentage = 0 | |
if percentage > 0: | |
summary += f""" | |
<details> | |
<summary style="cursor: pointer; color: #0066cc;">{result['Title']} (相關性:{percentage} %)</summary> | |
<p>{result['Content']}</p> | |
</details> | |
""" | |
return summary | |
else: | |
return "<p>未檢索到相關法令彙編函釋。</p>" | |
def llm_openai_api(query, answer): | |
client = OpenAI( | |
api_key=os.environ.get("YOUR_API_TOKEN"), | |
base_url="https://api.sambanova.ai/v1", | |
) | |
user_prompt = f""" | |
「題目:{query} | |
答案:{answer}」 | |
請詳細分析答案內容後,依據與題目相關性最高的稅目名稱及最多3個重點關鍵字回應我,提供的3個重點關鍵字不能與稅目名稱相同,問題與答案中的稅目名稱列入TaxName,關鍵字列入KeyWord,只須根據格式回應,不要寫其他的。 | |
# 回應字典格式範例: | |
{{"TaxName": "地價稅", "KeyWord": "宿舍用地,醫護人員"}} | |
""" | |
try: | |
response = client.chat.completions.create( | |
model='Meta-Llama-3.1-405B-Instruct', | |
messages=[ | |
{"role": "system", "content": "Must reply to user in Traditional Chinese."}, | |
{"role": "user", "content": user_prompt} | |
], | |
temperature=0.7, | |
top_p=0.9 | |
) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"檢索法令彙編函釋 API Key!Error: {str(e)}") | |
gr.Warning(f"檢索法令彙編函釋 API Key 額度不足!!") | |
return '{"TaxName": "", "KeyWord": ""}' | |
def handle_interaction(query, api_key, state): | |
gr.Info("開始處理問題,請稍待片刻......") | |
start_time = time.time() | |
if state is None: | |
state = {"history": []} | |
if not api_key: | |
resend.api_key = os.environ["YOUR_USE_API_KEY"] | |
params: resend.Emails.SendParams = { | |
"from": "Tax_KM <onboarding@resend.dev>", | |
"to": ["antivir7@gmail.com"], | |
"subject": "地方稅知識庫 API KEY", | |
"html": f"<strong>檢索內容:<br>{query}</strong>", | |
} | |
try: | |
email_response = resend.Emails.send(params) | |
print(f"Email sent successfully. Response:{email_response}") | |
except Exception as e: | |
print(f"Failed to send email:{str(e)}") | |
api_key = os.getenv("YOUR_API_KEY") | |
query = convert_punctuation(query) | |
answer, insight_questions = answer_multiple_questions(query, api_key) | |
questions = split_questions(query) | |
if len(questions) == 1: | |
api_response = llm_openai_api(query, answer) | |
tax_name = "" | |
keywords = [] | |
print(f"LLM剖析:{api_response}") | |
try: | |
response_dict = eval(api_response) | |
tax_name = response_dict.get("TaxName", "") | |
keywords = response_dict.get("KeyWord", "").split(",") | |
except: | |
print("剖析相關法令彙編函釋失敗!!") | |
print(f"Tax Law: {tax_law}") | |
print(f"Keywords: {keywords}") | |
tax_law = get_tax_law(tax_name) | |
law_summary_content = fetch_law_summary(query, tax_law, keywords) | |
else: | |
law_summary_content = "" | |
gr.Info(f"多個問題不會提供法令彙編函釋檢索結果。") | |
state["history"].append((query, answer)) | |
while len(insight_questions) < 3: | |
insight_questions.append("提供更多地方稅資訊") | |
end_time = time.time() | |
gr.Info(f"AI知識庫已答覆,執行時間: {(end_time - start_time):.2f} 秒。") | |
return answer, insight_questions[0], insight_questions[1], insight_questions[2], state, query, law_summary_content | |
def convert_punctuation(text): | |
return text.replace('?', '?').replace(',', ',').replace('!', '!').replace(' ', ' ') | |
def clear_outputs(): | |
return "", "", gr.update(value="", visible=False) | |
custom_css = """ | |
.query-input { | |
background-color: #B7E0FF !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.query-input textarea { | |
font-size: 18px !important; | |
background-color: #ffffff; | |
border: 1px solid #f0f8ff; | |
border-radius: 8px; | |
} | |
.answer-box { | |
background-color: #FFF5CD !important; | |
padding: 10px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.answer-box textarea { | |
font-size: 18px !important; | |
background-color: #ffffff; | |
border: 1px solid #f0f8ff; | |
border-radius: 8px; | |
} | |
.center-text { | |
text-align: center !important; | |
color: #ff4081; | |
text-shadow: 2px 2px 4px rgba(0,0,0,0.1); | |
margin-bottom: 0 !important; | |
} | |
#submit-btn { | |
border-radius: 10px !important; | |
border: none !important; | |
background-color: #ff4081 !important; | |
color: white !important; | |
font-weight: bold !important; | |
transition: all 0.3s ease !important; | |
margin: 0 !important; | |
} | |
#submit-btn:hover { | |
background-color: #f50057 !important; | |
transform: scale(1.05); | |
} | |
.insight-btn { | |
border-radius: 10px !important; | |
border: none !important; | |
background-color: #4dd8e2 !important; | |
} | |
.insight-btn:hover { | |
background-color: #00bcd4 !important; | |
} | |
.gr-form { | |
background-color: #e8f5e9 !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
} | |
.api-key-input { | |
background-color: #FFCFB3 !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.text-background { | |
font-size: 18px !important; | |
padding: 5px !important; | |
border-radius: 10px !important; | |
border: 2px solid #B7E0FF !important; | |
margin: 0 !important; | |
} | |
.clear-button { | |
color: white !important; | |
background-color: #000000 !important; | |
padding: 5px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.clear-button:hover { | |
background-color: #000000 !important; | |
transform: scale(1.05); | |
} | |
""" | |
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as iface: | |
gr.Markdown(""" | |
# 地方稅知識庫系統 - 財政部財政資訊中心 | |
> ### **※ RAG-based KM 以地方稅極少知識資料作示範,僅供參考,準確資訊請依地方稅稽徵機關回覆為準。系統部署:江信宗,LLM:Llama-3.1-70B。** | |
""", elem_classes="center-text") | |
with gr.Row(): | |
query_input = gr.Textbox(label="輸入您的問題,系統將基於學習到的知識資料提供相關答案。", placeholder="支援同時輸入多個問題,例如:問題1?問題2?", autofocus=True, scale=3, max_lines=5, elem_classes="query-input") | |
api_key_input = gr.Textbox(label="輸入您的 API Key", type="password", placeholder="API authentication key", scale=1, elem_classes="api-key-input") | |
answer_output = gr.Textbox(label="知識庫答案", interactive=False, max_lines=40, elem_classes="answer-box") | |
with gr.Row(): | |
insight_q1 = gr.Button("洞察問題 1", visible=False, elem_classes=["insight-btn"]) | |
insight_q2 = gr.Button("洞察問題 2", visible=False, elem_classes=["insight-btn"]) | |
insight_q3 = gr.Button("洞察問題 3", visible=False, elem_classes=["insight-btn"]) | |
state = gr.State() | |
current_question = gr.Textbox(lines=2, label="當前問題", visible=False) | |
law_summary = gr.HTML(label="法令彙編函釋檢索", elem_classes="text-background", visible=False) | |
with gr.Row(): | |
submit_btn = gr.Button("傳送", variant="primary", scale=3, elem_id="submit-btn") | |
clear_button = gr.Button("清除", variant="secondary", scale=1, elem_classes="clear-button") | |
gr.HTML( | |
""" | |
<span style="font-size: 18px; color: black;">※ 財政部各稅法令函釋檢索系統:</span><a href="https://ttc.mof.gov.tw/" title="財政部各稅法令函釋檢索系統" style="font-size: 18px; color: red;">https://ttc.mof.gov.tw/</a> | |
""" | |
) | |
def update_ui(answer, q1, q2, q3, state, current_q, law_summary): | |
return [ | |
answer, | |
gr.update(value=q1, visible=bool(q1)), | |
gr.update(value=q2, visible=bool(q2)), | |
gr.update(value=q3, visible=bool(q3)), | |
state, | |
current_q, | |
gr.update(value=law_summary, visible=bool(law_summary.strip())) | |
] | |
submit_btn.click( | |
fn=handle_interaction, | |
inputs=[query_input, api_key_input, state], | |
outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
).then( | |
fn=update_ui, | |
inputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary], | |
outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
) | |
for btn in [insight_q1, insight_q2, insight_q3]: | |
btn.click( | |
lambda x: x, | |
inputs=[btn], | |
outputs=[query_input] | |
) | |
def clear_outputs(): | |
return "", "", gr.update(value="", visible=False) | |
clear_button.click( | |
fn=clear_outputs, | |
inputs=[], | |
outputs=[query_input, answer_output, law_summary] | |
) | |
if __name__ == "__main__": | |
if "SPACE_ID" in os.environ: | |
iface.launch() | |
else: | |
iface.launch(share=True, show_api=False) | |