| import gradio as gr |
| from openai import OpenAI |
| from huggingface_hub import InferenceClient |
| import os |
| import time |
| import requests |
| import urllib.parse |
| import pandas as pd |
| from langchain_huggingface import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import FAISS |
| from PIL import Image |
| from dotenv import load_dotenv |
| import emotion |
|
|
| load_dotenv() |
|
|
| |
| |
| |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
| global_df = None |
| global_mood_df = None |
| global_retriever = None |
| rag_initialized = False |
|
|
| def init_rag_system(): |
| global global_df, global_mood_df, global_retriever, rag_initialized |
| if rag_initialized: return |
| try: |
| global_df = pd.read_csv('restaurants.csv') |
| global_df['RAG_Content'] = global_df['RAG_Content'].fillna("") |
| global_df['Category'] = global_df['Category'].fillna("其他") |
| global_mood_df = pd.read_csv('mood_food_guide.csv') |
| except Exception: pass |
| if os.path.exists("faiss_index"): |
| try: |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
| vectorstore = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) |
| global_retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) |
| except Exception: pass |
| rag_initialized = True |
|
|
| |
| |
| |
| def get_restaurant_data(mood_score_str, food_choice): |
| init_rag_system() |
| if global_df is None or global_df.empty: return None, True, "資料庫未載入", "無建議" |
| try: score = int(str(mood_score_str).split(' ')[0]) |
| except: score = 3 |
| mood_info = global_mood_df[global_mood_df['分數'] == score] |
| if not mood_info.empty: |
| rec_categories = mood_info.iloc[0]['推薦料理類別'] |
| mood_reason = mood_info.iloc[0]['原因'] |
| else: |
| rec_categories = "" |
| mood_reason = "隨意探索" |
| candidates = global_df.copy() |
| if rec_categories: |
| candidates = candidates[candidates['Category'].apply(lambda x: str(x) in str(rec_categories) or str(rec_categories) in str(x))] |
| food_keyword = "飯" if food_choice == "吃飯" else "麵" if food_choice == "吃麵" else "" |
| if food_keyword: |
| candidates = candidates[candidates['Name'].str.contains(food_keyword, case=False, na=False) | candidates['RAG_Content'].str.contains(food_keyword, case=False, na=False)] |
| if candidates.empty: |
| result = global_df.sample(1).iloc[0]; is_random = True |
| else: |
| result = candidates.sample(1).iloc[0]; is_random = False |
| return result, is_random, rec_categories, mood_reason |
|
|
| def generate_content_with_groq(restaurant_name, restaurant_detail, user_diary, mood_score, mood_guide_reason, debug_mode=False): |
| if not GROQ_API_KEY: return "⚠️ 請設定 GROQ_API_KEY", "" |
| |
| client = OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1") |
| |
| system_prompt = "你是一個幽默、懂吃且善解人意的 AI 朋友。請根據使用者的日記、心情以及「心情美食指南」來推薦餐廳。" |
| user_msg = f""" |
| 【狀態】心情分數:{mood_score},日記:{user_diary} |
| 【心情美食指南建議】 |
| 因為分數是 {mood_score},建議吃這類食物的原因是:「{mood_guide_reason}」。 |
| |
| 【推薦餐廳】 |
| 名稱:{restaurant_name} |
| 資料:{restaurant_detail} |
| |
| 任務: |
| 請用繁體中文寫一段溫暖有趣的回覆: |
| 1. 先回應他的日記與測驗人設。 |
| 2. 引用「心情美食指南」的原因,告訴他為什麼現在適合吃這家餐廳(例如:「就像指南說的,現在你需要一點多巴胺...」)。 |
| 3. 介紹這家餐廳的特色。 |
| (只需要回覆文字內容) |
| """ |
| |
| debug_log = "" |
| if debug_mode: |
| debug_log = f""" |
| ### 🔧 Groq Prompt Debug |
| **System Prompt:** |
| {system_prompt} |
| |
| **User Message:** |
| {user_msg} |
| """ |
|
|
| try: |
| response = client.chat.completions.create(model="llama-3.3-70b-versatile", messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_msg}]) |
| content = response.choices[0].message.content |
| return content, debug_log |
| except Exception as e: |
| return f"Groq Error: {str(e)}", debug_log |
|
|
| def generate_image_huggingface(prompt): |
| if not HF_TOKEN: return None |
| try: |
| hf_client = InferenceClient(token=HF_TOKEN) |
| return hf_client.text_to_image(prompt=prompt, model="stabilityai/stable-diffusion-xl-base-1.0") |
| except: return None |
|
|
| def mood_agent_logic(score_input, food_input, diary_input, debug_mode): |
| restaurant, is_random, rec_categories, mood_reason = get_restaurant_data(score_input, food_input) |
| if restaurant is None: yield "資料庫讀取錯誤", None, "", gr.update(); return |
| |
| name = restaurant['Name']; address = restaurant['Address']; url = restaurant['URL']; img_prompt = restaurant.get('Visual_prompt') |
| |
| rag_info = str(restaurant.get('RAG_Content', '')) |
| if global_retriever: |
| docs = global_retriever.invoke(name) |
| if docs: rag_info = "\n".join([d.page_content for d in docs]) |
| |
| ai_text, groq_debug_log = generate_content_with_groq(name, rag_info, diary_input, score_input, mood_reason, debug_mode) |
| |
| full_debug_log = "" |
| if debug_mode: |
| img_debug_log = f""" |
| ### 🎨 Image Prompt Debug |
| **Visual Prompt:** |
| {img_prompt} |
| """ |
| full_debug_log = groq_debug_log + "\n\n" + img_debug_log |
|
|
| debug_output_update = gr.update(value=full_debug_log, visible=debug_mode) |
|
|
| final_response = f"### 🍽️ 推薦:{name}\n\n{ai_text}" |
| map_html = f'<div style="text-align:center"><a href="{url}" target="_blank" style="background:#4CAF50;color:white;padding:8px 16px;border-radius:20px;text-decoration:none">🗺️ Google Map 導航</a></div>' |
| |
| yield final_response, None, map_html, debug_output_update |
| |
| image_output = generate_image_huggingface(img_prompt) |
| yield final_response, image_output, map_html, debug_output_update |
|
|
|
|
| |
| |
| |
|
|
| def _score_to_radio_value(score): |
| mapping = {1: "1 (心情差)", 2: "2 (不太好)", 3: "3 (普通)", 4: "4 (不錯)", 5: "5 (超棒)"} |
| try: score = int(score) |
| except: score = 3 |
| return mapping.get(score, "3 (普通)") |
|
|
| def bridge_start_click(st): |
| res = emotion.on_restart(st) |
| return res[1], res[0], gr.update(visible=True), res[5], res[4], gr.update(visible=False) |
|
|
| def bridge_stop_click(st): |
| res = emotion.on_stop(st) |
| return res[1], res[0], gr.update(visible=False), res[5], res[4] |
|
|
| def bridge_predict_frame(frame, st): |
| res = emotion.on_stream(frame, st) |
| out_cam = res[0]; out_result = res[1]; out_st = res[4]; out_btn_start = res[5] |
| score_update = gr.update(); tabs_update = gr.update(); out_btn_stop = gr.update() |
| out_btn_stop = gr.update() |
| btn_go_visible = gr.update(visible=False) |
|
|
| if out_st.finished and hasattr(out_st, 'final_score'): |
| new_val = _score_to_radio_value(out_st.final_score) |
| score_update = gr.update(value=new_val) |
| out_btn_stop = gr.update(visible=False) |
| btn_go_visible = gr.update(visible=True) |
| |
| return out_cam, out_result, out_st, out_btn_stop, out_btn_start, score_update, btn_go_visible |
|
|
|
|
| |
| |
| |
| css_ = "#app_container { max-width: 960px; margin: 0 auto; }" |
|
|
| with gr.Blocks(title="AI 心情食堂", css=css_) as demo: |
| |
| st_state = gr.State(emotion.AppState()) |
|
|
| with gr.Tabs() as tabs: |
| |
| |
| with gr.TabItem("😊 情緒辨識 (Step 1)", id=0): |
| with gr.Column(elem_id="app_container"): |
| gr.Markdown("### 第一步:測測你的心情能量\n讓 AI 看看你的表情,自動幫你決定心情分數!(辨識完畢會自動跳轉)") |
| |
| with gr.Row(): |
| btn_start = gr.Button("📸 開啟攝影機辨識", variant="primary") |
| btn_stop = gr.Button("⏹️ 停止", variant="secondary", visible=False) |
|
|
| cam = gr.Image(sources=["webcam"], streaming=True, type="numpy", label="攝影機畫面", visible=False) |
| result_markdown = gr.Markdown(emotion._hint_html("請按「開啟攝影機辨識」並允許瀏覽器使用相機。")) |
| btn_go_dining = gr.Button("🚀 確定心情,來找餐廳!", variant="primary", visible=False, size="lg") |
|
|
| |
| with gr.TabItem("🍽️ AI 心情食堂 (Step 2)", id=1): |
| with gr.Column(): |
| gr.Markdown(f"## 🍱 今天想吃點什麼?") |
| with gr.Row(): |
| with gr.Column(scale=1): |
| score_input = gr.Radio( |
| ["1 (心情差)", "2 (不太好)", "3 (普通)", "4 (不錯)", "5 (超棒)"], |
| label="1. 心情分數 (由 Tab 1 自動填入)", |
| value="3 (普通)" |
| ) |
| food_input = gr.Radio(["吃飯", "吃麵", "隨便"], label="2. 想吃什麼", value="隨便") |
| diary_input = gr.Textbox(lines=4, label="3. 心情日記", placeholder="寫下今天發生的事...") |
| |
| |
| submit_btn = gr.Button("🍱 送出給 Agent", variant="primary") |
| |
| debug_output = gr.Markdown(label="除錯資訊 (Debug Log)", visible=False) |
| |
| with gr.Column(scale=1): |
| agent_output = gr.Markdown(label="AI 回應") |
| image_output = gr.Image(label="AI 推薦美食圖", type="pil", width=400) |
| map_output = gr.HTML(label="地圖導航") |
|
|
| |
| |
| |
|
|
| btn_start.click( |
| fn=bridge_start_click, |
| inputs=[st_state], |
| outputs=[result_markdown, cam, btn_stop, btn_start, st_state, btn_go_dining], |
| show_progress="minimal" |
| ) |
|
|
| btn_stop.click( |
| fn=bridge_stop_click, |
| inputs=[st_state], |
| outputs=[result_markdown, cam, btn_stop, btn_start, st_state], |
| show_progress="minimal" |
| ) |
|
|
| cam.stream( |
| fn=bridge_predict_frame, |
| inputs=[cam, st_state], |
| outputs=[cam, result_markdown, st_state, btn_stop, btn_start, score_input, btn_go_dining], |
| show_progress="minimal" |
| ) |
|
|
| btn_go_dining.click( |
| fn=lambda: gr.Tabs(selected=1), |
| inputs=None, |
| outputs=tabs |
| ) |
|
|
| submit_btn.click( |
| fn=mood_agent_logic, |
| inputs=[score_input, food_input, diary_input], |
| outputs=[agent_output, image_output, map_output, debug_output] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(ssr_mode=False) |