Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # 財政部財政資訊中心 江信宗 | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import gradio as gr | |
| from openai import OpenAI | |
| from langchain_community.utils import user_agent | |
| from langchain_groq import ChatGroq | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.chains import RetrievalQA | |
| from langchain_community.document_loaders import WebBaseLoader, TextLoader | |
| from langchain.prompts import PromptTemplate | |
| from langchain.schema import Document | |
| import resend | |
| import requests | |
| import re | |
| import time | |
| def load_documents(sources): | |
| documents = [] | |
| for source in sources: | |
| try: | |
| if isinstance(source, str): | |
| if source.startswith('http'): | |
| loader = WebBaseLoader(source) | |
| else: | |
| loader = TextLoader(source) | |
| documents.extend(loader.load()) | |
| elif isinstance(source, dict): | |
| documents.append(Document(page_content=source['content'], metadata=source.get('metadata', {}))) | |
| except Exception as e: | |
| print(f"Error loading source {source}: {str(e)}") | |
| return documents | |
| sources = [ | |
| "TaxQADataSet_Slim1.txt", | |
| "TaxQADataSet_Slim2.txt", | |
| "TaxQADataSet_Slim3.txt", | |
| "TaxQADataSet_Slim4.txt", | |
| "TaxQADataSet_Slim5.txt", | |
| "TaxQADataSet_Slim6.txt", | |
| "TaxQADataSet_ntpc1.txt", | |
| "TaxQADataSet_ntpc2.txt", | |
| "TaxQADataSet_kctax.txt", | |
| "TaxQADataSet_chutax.txt", | |
| "LandTaxAct1100623.txt", | |
| "TheEnforcementRulesoftheLandTaxAct1100923.txt", | |
| "HouseTaxAct1130103.txt", | |
| "VehicleLicenseTaxAct1101230.txt", | |
| "TaxCollectionAct1101217.txt", | |
| "AmusementTaxAct960523.txt", | |
| "StampTaxAct910515.txt", | |
| "DeedTaxAct990505.txt" | |
| ] | |
| documents = load_documents(sources) | |
| print(f"\n成功載入 {len(documents)} 個檔案") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=512, | |
| chunk_overlap=50, | |
| length_function=len, | |
| is_separator_regex=False, | |
| separators=["\n\n\n","\n\n", "\n", "。"] | |
| ) | |
| split_docs = text_splitter.split_documents(documents) | |
| print(f"分割後的文件數量:{len(split_docs)}") | |
| embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
| print(f"\n成功初始化 Microsoft 嵌入模型") | |
| print(f"\n開始建立向量資料庫") | |
| vectorstore = Chroma.from_documents(split_docs, embeddings, persist_directory="./Knowledge-base") | |
| print(f"成功建立 Chroma 向量資料庫,共有 {len(split_docs)} 個文檔") | |
| retriever = vectorstore.as_retriever( | |
| search_type="mmr", | |
| search_kwargs={ | |
| "k": min(4, len(split_docs)), | |
| "fetch_k": min(20, len(split_docs)), | |
| "lambda_mult": 0.8 | |
| } | |
| ) | |
| print(f"檢索演算法:Maximum Marginal Relevance Retrieval") | |
| print(f"檢索文檔數量:k={min(4, len(split_docs))}, fetch_k={min(20, len(split_docs))}") | |
| template = """Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in Taiwanese Traditional Chinese. | |
| 在回答之前,請仔細分析檢索到的上下文,確保你的回答準確完整反映了上下文中的訊息,而不是依賴先前的知識,在回應的答案中絕對不要提到是根據上下文回答。 | |
| 如果檢索到的多個上下文之間存在聯繫,請整合這些訊息以提供更全面的回答,但要避免過度推斷。 | |
| 如果檢索到的上下文不包含足夠回答問題的訊息,請誠實的說明,不要試圖編造答案。 | |
| 上下文: {context} | |
| 問題: {question} | |
| 答案:""" | |
| PROMPT = PromptTemplate( | |
| template=template, input_variables=["context", "question"] | |
| ) | |
| def create_chain(llm): | |
| return RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": PROMPT} | |
| ) | |
| print(f"成功建立 RAG Chain") | |
| def initialize_llm(api_key): | |
| return ChatGroq( | |
| groq_api_key=api_key, | |
| model_name='llama-3.3-70b-versatile' | |
| ) | |
| def generate_insight_questions(query, api_key): | |
| llm = initialize_llm(api_key) | |
| prompt = f"""Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in "Traditional Chinese". | |
| 根據以下回答,生成3個相關的洞察問題: | |
| 原始問題: {query} | |
| 請提供3個簡短但有深度的問題,這些問題應該符合: | |
| 1. 與原始問題緊密相關 | |
| 2. 準確重新描述原始問題 | |
| 3. 引導更深入的解決原始問題 | |
| 請直接列出這3個問題,每個問題一行,不要添加編號或其他文字。 | |
| """ | |
| try: | |
| response = llm.invoke(prompt) | |
| if hasattr(response, 'content'): | |
| questions = response.content.split('\n') | |
| else: | |
| questions = str(response).split('\n') | |
| while len(questions) < 3: | |
| questions.append("提供更多地方稅資訊") | |
| return questions[:3] | |
| except Exception as e: | |
| print(f"Error generating insight questions:{str(e)}") | |
| return ["提供更多地方稅資訊", "提供其他地方稅問題", "還想了解什麼地方稅目"] | |
| def answer_question(query, api_key): | |
| try: | |
| gr.Info("檢索地方稅知識庫中......") | |
| llm = initialize_llm(api_key) | |
| chain = create_chain(llm) | |
| result = chain.invoke({"query": query}) | |
| answer = result["result"] | |
| insight_questions = generate_insight_questions(query, api_key) | |
| while len(insight_questions) < 3: | |
| insight_questions.append("提供更多地方稅資訊") | |
| return answer, insight_questions[:3] | |
| except Exception as e: | |
| return f"抱歉,處理您的問題時發生錯誤:{str(e)}", [] | |
| def split_questions(query): | |
| questions = re.split(r'[?!。 ]', query) | |
| return [q.strip() for q in questions if q.strip()] | |
| def answer_multiple_questions(query, api_key): | |
| questions = split_questions(query) | |
| all_answers = [] | |
| all_insight_questions = [] | |
| for question in questions: | |
| answer, insight_questions = answer_question(question, api_key) | |
| if len(questions) > 1: | |
| all_answers.append(f"【問題】{question}\n答案:{answer}") | |
| else: | |
| all_answers.append(answer) | |
| all_insight_questions.extend(insight_questions) | |
| if len(questions) > 1: | |
| combined_answer = "\n\n\n".join(all_answers) | |
| else: | |
| combined_answer = "\n".join(all_answers) | |
| selected_insight_questions = all_insight_questions[:3] | |
| return combined_answer, selected_insight_questions | |
| def get_tax_law(tax_type): | |
| tax_law_dict = { | |
| "房屋稅": "房屋稅條例", | |
| "地價稅": "土地稅法", | |
| "土地增值稅": "土地稅法", | |
| "增值稅": "土地稅法", | |
| "契稅": "契稅條例", | |
| "娛樂稅": "娛樂稅法", | |
| "印花稅": "印花稅法", | |
| "使用牌照稅": "使用牌照稅法", | |
| "牌照稅": "使用牌照稅法", | |
| "稅捐稽徵法": "稅捐稽徵法", | |
| "綜合所得稅": "所得稅法", | |
| "所得稅": "所得稅法", | |
| "遺產稅": "遺產及贈與稅法", | |
| "贈與稅": "遺產及贈與稅法", | |
| "營業稅": "營業稅法" | |
| } | |
| return tax_law_dict.get(tax_type, "無稅法") | |
| def fetch_law_summary(query, tax_law, keywords): | |
| url = "https://ttc.mof.gov.tw/Api/GetData" | |
| headers = { | |
| "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", | |
| "accept": "application/json, text/javascript, */*; q=0.01", | |
| "accept-encoding": "gzip, deflate, br, zstd", | |
| "accept-language": "zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "referer": "https://ttc.mof.gov.tw/" | |
| } | |
| gr.Info("檢索法令彙編函釋中......") | |
| version_payload = { | |
| "FunctionID": "FB10001", | |
| "ObjParams[TaxAct]": tax_law, | |
| "ObjParams[TaxVer]": "請選擇", | |
| "ObjParams[Chapter]": "請選擇", | |
| "ObjParams[Article]": "請選擇", | |
| "ObjParams[Content]": "", | |
| "ObjParams[Operator01]": "0", | |
| "ObjParams[Content01]": "", | |
| "ObjParams[Operator02]": "0", | |
| "ObjParams[Content02]": "" | |
| } | |
| try: | |
| version_response = requests.post(url, data=version_payload, headers=headers) | |
| version_response.raise_for_status() | |
| version_data = version_response.json() | |
| if version_data["Code"] == "1" and "Table1" in version_data["Data"]: | |
| latest_version = "請選擇" | |
| for item in version_data["Data"]["Table1"]: | |
| if item["TaxAct"] == tax_law: | |
| latest_version = item["TaxVer"] | |
| break | |
| if latest_version == "請選擇": | |
| print(f"未找到 {tax_law} 的對應版本,使用預設選項。") | |
| else: | |
| gr.Warning("無法獲取稅法版本資訊,使用預設選項。") | |
| latest_version = "請選擇" | |
| except Exception as e: | |
| print(f"獲取稅法版本時發生錯誤:{str(e)}") | |
| latest_version = "請選擇" | |
| all_results = [] | |
| for keyword in keywords: | |
| payload = { | |
| "FunctionID": "FB10001", | |
| "ObjParams[TaxAct]": tax_law, | |
| "ObjParams[TaxVer]": latest_version, | |
| "ObjParams[Chapter]": "請選擇", | |
| "ObjParams[Article]": "請選擇", | |
| "ObjParams[Content]": keyword, | |
| "ObjParams[Operator01]": "0", | |
| "ObjParams[Content01]": "", | |
| "ObjParams[Operator02]": "0", | |
| "ObjParams[Content02]": "" | |
| } | |
| try: | |
| response = requests.post(url, data=payload, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data["Code"] == "1" and "Table" in data["Data"]: | |
| all_results.extend(data["Data"]["Table"]) | |
| except Exception as e: | |
| print(f"檢索關鍵字 '{keyword}' 的法令彙編函釋時發生錯誤:{str(e)}") | |
| if all_results: | |
| summary = f"<h3>相關法令彙編函釋檢索結果({tax_law} {latest_version}):</h3>" | |
| unique_results = {} | |
| for result in all_results: | |
| tax_sn = result.get('TaxSN', '') | |
| if tax_sn not in unique_results: | |
| unique_results[tax_sn] = result | |
| for index, result in enumerate(list(unique_results.values())[:20]): # 限制為前20個唯一結果 | |
| client = OpenAI( | |
| api_key=os.environ.get("YOUR_API_TOKEN"), | |
| base_url="https://api.sambanova.ai/v1", | |
| ) | |
| system_prompt = f""" | |
| 請判斷以下函釋內容與user提問的內容。 | |
| 請給出一個0到100之間的相關性百分比,不要任何說明或理由。 | |
| 回答格式為: | |
| 相關性:XX% | |
| 函釋內容:```{result['Content']}``` | |
| """ | |
| prompt = f"""```{query}```""" | |
| max_retries = 2 | |
| retry_delay = 4 | |
| for attempt in range(max_retries): | |
| try: | |
| response = client.chat.completions.create( | |
| model="Meta-Llama-3.1-405B-Instruct", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7 | |
| ) | |
| relevance_percentage = response.choices[0].message.content.strip() | |
| break | |
| except Exception as e: | |
| if (attempt == 0) or (attempt == max_retries - 1): | |
| try: | |
| response = client.chat.completions.create( | |
| model="Meta-Llama-3.1-70B-Instruct", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7 | |
| ) | |
| relevance_percentage = response.choices[0].message.content.strip() | |
| break | |
| except Exception as e2: | |
| relevance_percentage = "相關性:0%" | |
| break | |
| else: | |
| print(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| try: | |
| percentage = int(relevance_percentage.split(":")[1].strip().rstrip('%')) | |
| except ValueError: | |
| print(f"Warning: Could not parse relevance percentage from '{relevance_percentage}'") | |
| percentage = 0 | |
| if percentage > 0: | |
| summary += f""" | |
| <details> | |
| <summary style="cursor: pointer; color: #0066cc;">{result['Title']} (相關性:{percentage} %)</summary> | |
| <p>{result['Content']}</p> | |
| </details> | |
| """ | |
| return summary | |
| else: | |
| return "<p>未檢索到相關法令彙編函釋。</p>" | |
| def llm_openai_api(query, answer): | |
| client = OpenAI( | |
| api_key=os.environ.get("YOUR_API_TOKEN"), | |
| base_url="https://api.sambanova.ai/v1", | |
| ) | |
| user_prompt = f""" | |
| 「題目:{query} | |
| 答案:{answer}」 | |
| 請詳細分析答案內容後,依據與題目相關性最高的稅目名稱及最多3個重點關鍵字回應我,提供的3個重點關鍵字不能與稅目名稱相同,問題與答案中的稅目名稱列入TaxName,關鍵字列入KeyWord,只須根據格式回應,不要寫其他的。 | |
| # 回應字典格式範例: | |
| {{"TaxName": "地價稅", "KeyWord": "宿舍用地,醫護人員"}} | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model='Meta-Llama-3.1-405B-Instruct', | |
| messages=[ | |
| {"role": "system", "content": "Must reply to user in Traditional Chinese."}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| temperature=0.7, | |
| top_p=0.9 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"檢索法令彙編函釋 API Key!Error: {str(e)}") | |
| gr.Warning(f"檢索法令彙編函釋 API Key 額度不足!!") | |
| return '{"TaxName": "", "KeyWord": ""}' | |
| def handle_interaction(query, api_key, state): | |
| gr.Info("開始處理問題,請稍待片刻......") | |
| start_time = time.time() | |
| if state is None: | |
| state = {"history": []} | |
| if not api_key: | |
| resend.api_key = os.environ["YOUR_USE_API_KEY"] | |
| params: resend.Emails.SendParams = { | |
| "from": "Tax_KM <onboarding@resend.dev>", | |
| "to": ["antivir7@gmail.com"], | |
| "subject": "地方稅知識庫 API KEY", | |
| "html": f"<strong>檢索內容:<br>{query}</strong>", | |
| } | |
| try: | |
| email_response = resend.Emails.send(params) | |
| print(f"Email sent successfully. Response:{email_response}") | |
| except Exception as e: | |
| print(f"Failed to send email:{str(e)}") | |
| api_key = os.getenv("YOUR_API_KEY") | |
| query = convert_punctuation(query) | |
| answer, insight_questions = answer_multiple_questions(query, api_key) | |
| questions = split_questions(query) | |
| if len(questions) == 1: | |
| api_response = llm_openai_api(query, answer) | |
| tax_name = "" | |
| keywords = [] | |
| print(f"LLM剖析:{api_response}") | |
| try: | |
| response_dict = eval(api_response) | |
| tax_name = response_dict.get("TaxName", "") | |
| keywords = response_dict.get("KeyWord", "").split(",") | |
| except: | |
| print("剖析相關法令彙編函釋失敗!!") | |
| print(f"Tax Law: {tax_law}") | |
| print(f"Keywords: {keywords}") | |
| tax_law = get_tax_law(tax_name) | |
| law_summary_content = fetch_law_summary(query, tax_law, keywords) | |
| else: | |
| law_summary_content = "" | |
| gr.Info(f"多個問題不會提供法令彙編函釋檢索結果。") | |
| state["history"].append((query, answer)) | |
| while len(insight_questions) < 3: | |
| insight_questions.append("提供更多地方稅資訊") | |
| end_time = time.time() | |
| gr.Info(f"AI知識庫已答覆,執行時間: {(end_time - start_time):.2f} 秒。") | |
| return answer, insight_questions[0], insight_questions[1], insight_questions[2], state, query, law_summary_content | |
| def convert_punctuation(text): | |
| return text.replace('?', '?').replace(',', ',').replace('!', '!').replace(' ', ' ') | |
| def clear_outputs(): | |
| return "", "", gr.update(value="", visible=False) | |
| custom_css = """ | |
| .query-input { | |
| background-color: #B7E0FF !important; | |
| padding: 15px !important; | |
| border-radius: 10px !important; | |
| margin: 0 !important; | |
| } | |
| .query-input textarea { | |
| font-size: 18px !important; | |
| background-color: #ffffff; | |
| border: 1px solid #f0f8ff; | |
| border-radius: 8px; | |
| } | |
| .answer-box { | |
| background-color: #FFF5CD !important; | |
| padding: 10px !important; | |
| border-radius: 10px !important; | |
| margin: 0 !important; | |
| } | |
| .answer-box textarea { | |
| font-size: 18px !important; | |
| background-color: #ffffff; | |
| border: 1px solid #f0f8ff; | |
| border-radius: 8px; | |
| } | |
| .center-text { | |
| text-align: center !important; | |
| color: #ff4081; | |
| text-shadow: 2px 2px 4px rgba(0,0,0,0.1); | |
| margin-bottom: 0 !important; | |
| } | |
| #submit-btn { | |
| border-radius: 10px !important; | |
| border: none !important; | |
| background-color: #ff4081 !important; | |
| color: white !important; | |
| font-weight: bold !important; | |
| transition: all 0.3s ease !important; | |
| margin: 0 !important; | |
| } | |
| #submit-btn:hover { | |
| background-color: #f50057 !important; | |
| transform: scale(1.05); | |
| } | |
| .insight-btn { | |
| border-radius: 10px !important; | |
| border: none !important; | |
| background-color: #4dd8e2 !important; | |
| } | |
| .insight-btn:hover { | |
| background-color: #00bcd4 !important; | |
| } | |
| .gr-form { | |
| background-color: #e8f5e9 !important; | |
| padding: 15px !important; | |
| border-radius: 10px !important; | |
| } | |
| .api-key-input { | |
| background-color: #FFCFB3 !important; | |
| padding: 15px !important; | |
| border-radius: 10px !important; | |
| margin: 0 !important; | |
| } | |
| .text-background { | |
| font-size: 18px !important; | |
| padding: 5px !important; | |
| border-radius: 10px !important; | |
| border: 2px solid #B7E0FF !important; | |
| margin: 0 !important; | |
| } | |
| .clear-button { | |
| color: white !important; | |
| background-color: #000000 !important; | |
| padding: 5px !important; | |
| border-radius: 10px !important; | |
| margin: 0 !important; | |
| } | |
| .clear-button:hover { | |
| background-color: #000000 !important; | |
| transform: scale(1.05); | |
| } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as iface: | |
| gr.Markdown(""" | |
| # 地方稅知識庫系統 - 財政部財政資訊中心 | |
| > ### **※ RAG-based KM 以地方稅極少知識資料作示範,僅供參考,準確資訊請依地方稅稽徵機關回覆為準。系統部署:江信宗,LLM:Llama-3.1-70B。** | |
| """, elem_classes="center-text") | |
| with gr.Row(): | |
| query_input = gr.Textbox(label="輸入您的問題,系統將基於學習到的知識資料提供相關答案。", placeholder="支援同時輸入多個問題,例如:問題1?問題2?", autofocus=True, scale=3, max_lines=5, elem_classes="query-input") | |
| api_key_input = gr.Textbox(label="輸入您的 API Key", type="password", placeholder="API authentication key", scale=1, elem_classes="api-key-input") | |
| answer_output = gr.Textbox(label="知識庫答案", interactive=False, max_lines=40, elem_classes="answer-box") | |
| with gr.Row(): | |
| insight_q1 = gr.Button("洞察問題 1", visible=False, elem_classes=["insight-btn"]) | |
| insight_q2 = gr.Button("洞察問題 2", visible=False, elem_classes=["insight-btn"]) | |
| insight_q3 = gr.Button("洞察問題 3", visible=False, elem_classes=["insight-btn"]) | |
| state = gr.State() | |
| current_question = gr.Textbox(lines=2, label="當前問題", visible=False) | |
| law_summary = gr.HTML(label="法令彙編函釋檢索", elem_classes="text-background", visible=False) | |
| with gr.Row(): | |
| submit_btn = gr.Button("傳送", variant="primary", scale=3, elem_id="submit-btn") | |
| clear_button = gr.Button("清除", variant="secondary", scale=1, elem_classes="clear-button") | |
| gr.HTML( | |
| """ | |
| <span style="font-size: 18px; color: black;">※ 財政部各稅法令函釋檢索系統:</span><a href="https://ttc.mof.gov.tw/" title="財政部各稅法令函釋檢索系統" style="font-size: 18px; color: red;">https://ttc.mof.gov.tw/</a> | |
| """ | |
| ) | |
| def update_ui(answer, q1, q2, q3, state, current_q, law_summary): | |
| return [ | |
| answer, | |
| gr.update(value=q1, visible=bool(q1)), | |
| gr.update(value=q2, visible=bool(q2)), | |
| gr.update(value=q3, visible=bool(q3)), | |
| state, | |
| current_q, | |
| gr.update(value=law_summary, visible=bool(law_summary.strip())) | |
| ] | |
| submit_btn.click( | |
| fn=handle_interaction, | |
| inputs=[query_input, api_key_input, state], | |
| outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
| ).then( | |
| fn=update_ui, | |
| inputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary], | |
| outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
| ) | |
| for btn in [insight_q1, insight_q2, insight_q3]: | |
| btn.click( | |
| lambda x: x, | |
| inputs=[btn], | |
| outputs=[query_input] | |
| ) | |
| def clear_outputs(): | |
| return "", "", gr.update(value="", visible=False) | |
| clear_button.click( | |
| fn=clear_outputs, | |
| inputs=[], | |
| outputs=[query_input, answer_output, law_summary] | |
| ) | |
| if __name__ == "__main__": | |
| if "SPACE_ID" in os.environ: | |
| iface.launch() | |
| else: | |
| iface.launch(share=True, show_api=False) | |