import google.generativeai as genai_gen import json, os import time from collections import defaultdict import io from PIL import Image import traceback from google.cloud import speech, texttospeech, storage from system_instruction import system_instruction import requests_handler from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage, AudioMessage, AudioSendMessage from rag_manager import RAGManager from Image_generation import ImageGenerator from pydub import AudioSegment from pydub.exceptions import CouldntDecodeError # 導入特定錯誤 from typing import Union, Tuple creds_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") creds_path = "/tmp/google_creds.json" if creds_json: with open(creds_path, "w") as f: f.write(creds_json) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path class ChatBot: """ 核心聊天機器人類別: - 與 Gemini 模型互動 - 處理使用者訊息(文字、圖片、語音) - 根據使用者查詢決定是否檢索外部資訊(RAG) - 提供 TTS 語音回覆並上傳到 GCS """ # --- 定義圖片生成觸發關鍵字 (類別層級) --- IMAGE_GENERATION_KEYWORDS = ["生成一張", "畫一張"] # 使用 startswith 匹配 IMAGE_EDIT_KEYWORDS = ["編輯圖片", "幫我改圖", "編輯這張圖"] def __init__(self, google_api_key, system_instruction, google_search_api_key=None, google_search_cse_id=None): """ 初始化 ChatBot 實例 """ self.image_generator = ImageGenerator() # API Key 預期通過環境變數配置 try: genai_gen.configure(api_key=google_api_key) print("Google AI API Key configured successfully via genai_gen.configure().") except AttributeError: print("Warning: genai_gen.configure not found. Assuming API key is configured via environment variable.") except Exception as config_err: print(f"Error configuring Google AI API Key: {config_err}") # 設定生成配置 generation_config = genai_gen.types.GenerationConfig(max_output_tokens=2048, temperature=0.5, top_p=0.5, top_k=16) # 建立並設定 Gemini 聊天模型 self.model_name = "gemini-2.0-flash" try: self.model = genai_gen.GenerativeModel( model_name=self.model_name, generation_config=generation_config, system_instruction=system_instruction, ) print(f"Gemini model '{self.model_name}' initialized.") except Exception as model_init_err: print(f"Error initializing Gemini model '{self.model_name}': {model_init_err}") raise self.chat_sessions = {} self.user_message_history = defaultdict(list) self.bot_reply_history = defaultdict(list) self.rag_manager = RAGManager( google_search_api_key=google_search_api_key, google_search_cse_id=google_search_cse_id ) print("RAG Manager initialized.") def start_chat_session(self, user_id): """啟動或獲取聊天會話""" if user_id not in self.chat_sessions: print(f"Starting new chat session for user {user_id}") try: self.chat_sessions[user_id] = self.model.start_chat(history=[]) except Exception as start_chat_err: print(f"Error starting chat session for {user_id}: {start_chat_err}") return None return self.chat_sessions.get(user_id) # --- determine_query_intent, retrieve_relevant_info, _extract_city_name, augment_prompt_with_info, _get_info_type --- @staticmethod def determine_query_intent(user_query): specific_api_keywords = ["空氣品質", "aqi", "天氣", "地震資訊", "牌告匯率", "今日匯率"] factual_indicators = ["如何", "什麼是", "解釋", "定義", "告訴我關於", "幫我查", "最新", "新聞", "資訊", "多少", "為什麼", "何時", "哪裡", "誰是", "幾點", "幾月", "怎麼做", "教學", "方法", "步驟"] conversational_indicators = ["你好", "嗨", "哈囉", "你覺得", "你喜歡", "聊聊", "聊天", "無聊", "我想", "我覺得", "我希望", "我喜歡", "謝謝", "笑話", "有趣", "開心", "難過", "討厭", "害怕", "緊張", "期待"] user_query_lower = user_query.lower() if any(keyword in user_query_lower for keyword in specific_api_keywords): return "specific_api" factual_score = sum(1 for indicator in factual_indicators if indicator in user_query_lower) conversational_score = sum(1 for indicator in conversational_indicators if indicator in user_query_lower) return "factual" if factual_score > conversational_score else "conversational" def retrieve_relevant_info(self, user_query, user_id=None): intent = ChatBot.determine_query_intent(user_query) user_query_lower = user_query.lower() if intent == "specific_api": if "空氣品質" in user_query_lower or "aqi" in user_query_lower: return requests_handler.get_air_quality() elif "天氣" in user_query_lower: return requests_handler.get_weather(city_name=self._extract_city_name(user_query_lower)) elif "地震資訊" in user_query_lower: return requests_handler.get_earthquake() elif "牌告匯率" in user_query_lower or "今日匯率" in user_query_lower: return requests_handler.get_exchange_rate() source_id = f"user_{user_id}" if user_id else None is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) if source_id else True if intent == "factual" and is_rag_enabled: print(f"RAG enabled for {source_id}. Getting web context...") web_context = self.rag_manager.get_web_context_for_query(user_query, user_id=user_id) if web_context: return f"從網路搜尋獲得的資訊:\n{web_context}" else: print("RAG: No web context found.") elif intent == "factual" and not is_rag_enabled: print(f"RAG disabled for {source_id}.") return None def _extract_city_name(self, user_query_lower): city_keywords = {"臺北市": ["台北", "臺北", "台北市", "臺北市"], "新北市": ["新北", "新北市"], "桃園市": ["桃園", "桃園市"], "臺中市": ["台中", "臺中", "台中市", "臺中市"], "新竹市": ["新竹", "新竹市"]} for city, keywords in city_keywords.items(): if any(keyword in user_query_lower for keyword in keywords): return city return None def augment_prompt_with_info(self, original_prompt, retrieved_info): if not retrieved_info: return f"""使用者: {original_prompt}\n橘橘: """ base_instruction = "請以橘橘(一隻友善的貓)的身份回答使用者。使用繁體中文,語氣自然活潑,偶爾加入「喵」作為口頭禪。" if "從網路搜尋獲得的資訊" in retrieved_info: augmented_prompt = f"""{base_instruction} 請參考以下網路資訊:\n\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:基於參考資料回答,保持對話自然。如果回答直接參考了某來源,請在句末附上網址 (來源: https://...)。若資料不足,請誠實說明。\n橘橘: """ else: info_type = self._get_info_type(retrieved_info) augmented_prompt = f"""{base_instruction} 請根據以下{info_type}資訊回答:\n\n{info_type}資訊:\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:融入資訊,保持對話自然。\n橘橘: """ return augmented_prompt def _get_info_type(self, info): if isinstance(info, str): if "空氣品質" in info or "aqi" in info.lower(): return "空氣品質" if "天氣" in info: return "天氣預報" if "地震" in info: return "地震資訊" if "匯率" in info: return "牌告匯率" if "從網路搜尋獲得的資訊" in info: return "網路搜尋" return "相關資訊" def store_user_message(self, user_id, message_type, message_content): max_history_len = 20 if len(self.user_message_history[user_id]) >= max_history_len: self.user_message_history[user_id].pop(0) self.user_message_history[user_id].append({"type": message_type, "content": message_content}) print(f"Stored user message for {user_id}: type='{message_type}', content='{str(message_content)[:50]}...'") def store_bot_reply(self, user_id, reply_text, max_history=5): if user_id not in self.bot_reply_history: self.bot_reply_history[user_id] = [] if reply_text and isinstance(reply_text, str): self.bot_reply_history[user_id].append(reply_text) if len(self.bot_reply_history[user_id]) > max_history: self.bot_reply_history[user_id] = self.bot_reply_history[user_id][-max_history:] print(f"Stored bot reply for {user_id}: '{reply_text[:50]}...'") else: print(f"Warning: Attempted to store invalid bot reply for {user_id}: {reply_text}") def get_last_bot_reply(self, user_id): if user_id in self.bot_reply_history and self.bot_reply_history[user_id]: return self.bot_reply_history[user_id][-1] return None # --- generate_speech_reply (生成 M4A/AAC) --- def generate_speech_reply(self, text): """產生 AAC (M4A) 語音回覆並上傳到 GCS。""" if not text or not isinstance(text, str): print("Error: Invalid text provided for speech generation.") return None, None try: tts_client = texttospeech.TextToSpeechClient() synthesis_input = texttospeech.SynthesisInput(text=text) voice = texttospeech.VoiceSelectionParams(language_code="cmn-TW", name="cmn-TW-Wavenet-C", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE) print("Synthesizing speech (WAV)...") audio_config_wav = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.LINEAR16, sample_rate_hertz=16000) response_wav = tts_client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config_wav) wav_content = response_wav.audio_content print("Speech synthesized as WAV successfully.") print("Converting WAV to AAC (M4A)...") try: audio = AudioSegment.from_wav(io.BytesIO(wav_content)) m4a_buffer = io.BytesIO() audio.export(m4a_buffer, format="mp4", codec="aac", bitrate="64k") m4a_content = m4a_buffer.getvalue() print(f"Audio converted to M4A (AAC) successfully, size: {len(m4a_content)} bytes.") except Exception as convert_err: print(f"Error converting WAV to M4A: {convert_err}") traceback.print_exc() return None, None print("Calculating M4A audio duration...") duration = ChatBot.get_audio_duration_from_bytes(m4a_content, format="m4a") # 使用 m4a 格式計算 if duration is None: print("Error: Failed to calculate M4A audio duration.") return None, None print(f"Calculated M4A duration: {duration} ms") bucket_name = "stt_bucket_for_allen" file_extension = "m4a" content_type = "audio/m4a" gcs_file_path = f"audio_replies/reply_{int(time.time() * 1000)}.{file_extension}" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(gcs_file_path) print(f"Uploading audio reply ({content_type}) to gs://{bucket_name}/{gcs_file_path}") blob.upload_from_string(data=m4a_content, content_type=content_type) print("Audio uploaded to GCS.") public_url = blob.public_url print(f"Generated GCS public URL: {public_url}") if not public_url or not public_url.startswith("https://"): print(f"Error or Warning: Invalid public URL generated: {public_url}") if public_url and public_url.startswith("http://"): public_url = "https://" + public_url[len("http://"):] print(f"Corrected URL to HTTPS: {public_url}") else: return None, None print(f"已上傳語音檔案到 GCS: {public_url},音訊長度: {duration} ms") return public_url, duration except Exception as e: print(f"產生語音回覆或上傳 GCS 時出錯: {e}") traceback.print_exc() return None, None # --- get_audio_duration_from_bytes (接受 format) --- @staticmethod def get_audio_duration_from_bytes(audio_bytes, format="mp3"): try: print(f"Calculating duration for format: {format}") audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=format) duration_ms = len(audio) return min(duration_ms, 60000) except CouldntDecodeError as decode_err: print(f"Pydub decode error calculating duration (format: {format}): {decode_err}") traceback.print_exc() return None except Exception as e: print(f"Error calculating audio duration (format: {format}): {e}") traceback.print_exc() return None def is_follow_up_question(self, user_id, current_query): recent_messages = self.user_message_history[user_id] if len(recent_messages) < 2: return False text_messages = [msg for msg in recent_messages if msg.get('type') == 'text'] if len(text_messages) < 2: return False if len(current_query) < 10: return True reference_words = ["他", "她", "它", "這個", "那個", "這", "那", "這些", "那些", "以上", "剛剛"] if any(word in current_query for word in reference_words): return True return False def _get_source_id_from_event(self, event): source_type = event.source.type if source_type == 'user': return f"user_{event.source.user_id}" elif source_type == 'group': return f"group_{event.source.group_id}" elif source_type == 'room': return f"room_{event.source.room_id}" user_id = getattr(event.source, 'user_id', None) return f"unknown_{user_id}" if user_id else "unknown_source" def handle_text_message(self, event, line_bot_api): """ 處理文字訊息事件:(已整合圖片生成、圖片編輯、圖片分析、RAG、聊天、語音回覆邏輯) """ user_id = event.source.user_id prompt = event.message.text.strip() source_id = self._get_source_id_from_event(event) # 使用輔助函數獲取 user/group/room ID (帶前綴) print(f"Handling text message from {source_id}: '{prompt}'") # 使用 source_id 打印 # --- 獲取用於 Push API 的原始 ID --- push_target_id = None if event.source.type == 'user': push_target_id = event.source.user_id elif event.source.type == 'group': push_target_id = event.source.group_id elif event.source.type == 'room': push_target_id = event.source.room_id reply_text = "" should_store_user_message = True should_store_bot_reply = True # --- 步驟 1: 檢查是否為圖片生成觸發 (startswith) --- is_image_generation_request = False image_description = "" matched_keyword = None for keyword in self.IMAGE_GENERATION_KEYWORDS: if prompt.startswith(keyword): is_image_generation_request = True matched_keyword = keyword image_description = prompt[len(keyword):].strip() break if is_image_generation_request: print(f"Image generation request detected with keyword '{matched_keyword}'.") if not image_description: # 情況:只有關鍵字,沒有描述 print("Image description is empty.") reply_text = "喵~你想畫什麼圖片呢?請在關鍵字後面加上描述喔!例如:「生成圖片 一隻貓在太空漫步」" should_store_bot_reply = True # 需要儲存這個提示 line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) # 不儲存 image_generation_pending_description 狀態,讓使用者直接帶描述觸發 else: # --- 有描述,執行生成流程 --- try: # 先用 reply_message 回覆 "生成中...",避免超時 print("Replying with 'Generating...' message.") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!圖片正在努力生成中...")) except Exception as reply_err: print(f"Warning: Failed to send initial 'Generating...' reply: {reply_err}") # 即使回覆失敗,也繼續嘗試生成,後續用 push print(f"Calling image generator for data with description: '{image_description}'") # 調用 ImageGenerator 的方法獲取圖片 bytes image_bytes = self.image_generator.generate_image_with_gemini(image_description) # 假設此方法返回 bytes 或 None image_url = None # 初始化 URL if image_bytes: # 如果成功獲取 bytes,上傳到 GCS print("Image data received. Uploading to GCS...") image_url = self.image_generator.upload_image_to_gcs(image_bytes, file_name_prefix=f"gen_{user_id[:8]}") if not image_url: print("GCS upload failed.") else: # 如果 generate_image_with_gemini 返回 None print("Image generation failed, no image bytes received.") # --- 使用 push_message 發送最終結果 (圖片或錯誤) --- if image_url: safe_image_url = image_url + "#" print(f"Image generated and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_image_url}") # 使用 push_target_id 打印 image_message = ImageSendMessage(original_content_url=safe_image_url, preview_image_url=safe_image_url) final_text_output = "喵~圖片生成好了!" text_message = TextSendMessage(text=final_text_output) try: line_bot_api.push_message(push_target_id, [image_message, text_message]) should_store_bot_reply = False except Exception as push_err: print(f"Error pushing final image message to {push_target_id}: {push_err}") # 嘗試推送錯誤文字到來源 try: line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片好了,但推送時好像有點問題...")) except: pass # 放棄最後的嘗試 else: # 失敗 (生成失敗或上傳失敗) error_msg = "喵~糟糕,圖片生成或上傳失敗了,請稍後再試!" print(f"Image generation/upload failed. Pushing error message to {push_target_id}.") try: line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg)) # 推送錯誤訊息到來源 should_store_bot_reply = False # 推送錯誤訊息也不存 bot reply except Exception as push_err: print(f"Error pushing final error message to {push_target_id}: {push_err}") # 圖片生成流程結束,儲存使用者訊息 if should_store_user_message: self.store_user_message(user_id, "text", prompt) # 如果有 reply_text (例如空描述提示),則儲存機器人回覆 if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text) return # 結束 handle_text_message # --- 如果不是圖片生成請求,繼續執行後續步驟 --- print("Not an image generation request.") # --- 步驟 2: 檢查是否為圖片編輯觸發 --- is_image_edit_request = False edit_description = "" matched_edit_keyword = None # 使用之前定義的 IMAGE_EDIT_KEYWORDS (假設已在類別中定義) if hasattr(self, 'IMAGE_EDIT_KEYWORDS'): for keyword in self.IMAGE_EDIT_KEYWORDS: if prompt.startswith(keyword): is_image_edit_request = True matched_edit_keyword = keyword edit_description = prompt[len(keyword):].strip() break else: # 如果沒有定義編輯關鍵字,可以選擇打印警告或忽略 # print("Warning: IMAGE_EDIT_KEYWORDS not defined in ChatBot class.") pass if is_image_edit_request: print(f"Image edit request detected with keyword '{matched_edit_keyword}'.") # 檢查是否有待處理的圖片 recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None) if not recent_image_info: # 情況:沒有先上傳圖片 print("No pending image found for editing.") reply_text = "喵~要編輯哪張圖片呀?請先傳送圖片給我喔!" should_store_bot_reply = True line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) elif not edit_description: # 情況:有圖片但沒有編輯描述 print("Edit description is empty.") reply_text = "喵~你想怎麼編輯這張圖片呢?請告訴我編輯指令喔! (例如:幫我改圖 加上太陽眼鏡)" should_store_bot_reply = True line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) else: # --- 有圖片且有描述,執行編輯流程 --- image_message_id = recent_image_info.get('content') print(f"Found pending image ID {image_message_id} for editing.") original_image_bytes = None # 初始化 try: # 先用 reply_message 回覆 "編輯中..." print("Replying with 'Editing...' message.") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!正在幫你編輯圖片...")) except Exception as reply_err: print(f"Warning: Failed to send initial 'Editing...' reply: {reply_err}") try: # 即時獲取原始圖片 bytes print(f"Fetching original image content for message ID: {image_message_id}") message_content = line_bot_api.get_message_content(image_message_id) original_image_bytes = message_content.content print(f"Fetched original image content, size: {len(original_image_bytes)} bytes") except Exception as fetch_err: print(f"Error fetching original image for editing: {fetch_err}") # 如果獲取原始圖片失敗,推送錯誤訊息 try: line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~找不到你要編輯的原始圖片耶...")) except Exception as push_fetch_err: # 捕捉推送錯誤 print(f"Error pushing fetch error message: {push_fetch_err}") # 清理狀態並結束 try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info] except: pass if should_store_user_message: self.store_user_message(user_id, "text", prompt) return # 結束 edited_image_bytes = None if original_image_bytes: print(f"Calling image editor with description: '{edit_description}'") # 呼叫編輯方法 if hasattr(self.image_generator, 'edit_image_with_gemini'): edited_image_bytes = self.image_generator.edit_image_with_gemini(original_image_bytes, edit_description) else: print("Error: image_generator does not have 'edit_image_with_gemini' method.") edited_image_url = None if edited_image_bytes: print("Edited image data received. Uploading to GCS...") edited_image_url = self.image_generator.upload_image_to_gcs(edited_image_bytes, file_name_prefix=f"edit_{user_id[:8]}") if not edited_image_url: print("GCS upload failed for edited image.") else: print("Image editing failed, no edited image bytes received.") # 推送結果 if edited_image_url: safe_edited_url = edited_image_url + "#" print(f"Image edited and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_edited_url}") image_message = ImageSendMessage(original_content_url=safe_edited_url, preview_image_url=safe_edited_url) final_text_output = "喵~圖片編輯好了!" text_message = TextSendMessage(text=final_text_output) try: line_bot_api.push_message(push_target_id, [image_message, text_message]) should_store_bot_reply = False except Exception as push_err: print(f"Error pushing final edited image message to {push_target_id}: {push_err}") try: line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片編輯好了,但推送時好像有點問題...")) except: pass else: error_msg = "喵~糟糕,圖片編輯失敗了,請稍後再試!" print(f"Image editing/upload failed. Pushing error message to {push_target_id}.") try: line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg)) should_store_bot_reply = False except Exception as push_err: print(f"Error pushing final edit error message to {push_target_id}: {push_err}") # 清理圖片狀態 (無論編輯成功與否) try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info] print("Removed image_pending state after edit attempt.") except Exception as remove_err: print(f"Warning: Could not remove image_pending state after edit: {remove_err}") # 圖片編輯流程結束 if should_store_user_message: self.store_user_message(user_id, "text", prompt) if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text) return # 結束 handle_text_message print("Not an image edit request.") # --- 步驟 3: 處理語音回覆請求 --- if prompt == "語音回覆": print("Voice reply request detected.") should_store_user_message = True should_store_bot_reply = False last_reply = self.get_last_bot_reply(user_id) reply_to_send = None if last_reply: print(f"Generating voice for last bot reply: '{last_reply[:50]}...'") audio_url, duration = self.generate_speech_reply(last_reply) # 假設此方法已更新為生成 M4A if audio_url and duration: try: safe_audio_url = audio_url + "#" # 加入 '#' 技巧 print(f"Sending audio reply. Safe URL: {safe_audio_url}, Duration: {duration}") reply_to_send = [ AudioSendMessage(originalContentUrl=safe_audio_url, duration=duration), TextSendMessage(text=f"這是上一句的回覆語音:\n「{last_reply}」") ] except Exception as send_err: print(f"傳送語音訊息時出錯: {send_err}.") reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(發送錯誤)... 上一句是:\n「{last_reply}」") should_store_bot_reply = True else: print("Failed to generate voice reply.") reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(生成失敗)... 上一句是:\n「{last_reply}」") should_store_bot_reply = True else: print("No previous bot reply found to generate voice for.") reply_to_send = TextSendMessage(text="喵~找不到上一句話耶,沒辦法轉成語音...") should_store_bot_reply = True try: if isinstance(reply_to_send, list): line_bot_api.reply_message(event.reply_token, reply_to_send) else: line_bot_api.reply_message(event.reply_token, [reply_to_send]) except Exception as final_reply_err: print(f"Error sending final voice reply/fallback: {final_reply_err}") traceback.print_exc() try: line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~處理語音回覆時出錯了!")) except: pass if should_store_user_message: self.store_user_message(user_id, "text", prompt) if should_store_bot_reply: if isinstance(reply_to_send, TextSendMessage): self.store_bot_reply(user_id, reply_to_send.text) elif isinstance(reply_to_send, list) and isinstance(reply_to_send[-1], TextSendMessage): self.store_bot_reply(user_id, reply_to_send[-1].text) return # 處理完語音請求後結束 # --- 步驟 4: 圖片分析 --- recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None) if recent_image_info: image_message_id = recent_image_info.get('content') print(f"Found pending image message ID: {image_message_id}...") try: message_content = line_bot_api.get_message_content(image_message_id) image_bytes = io.BytesIO(message_content.content) image = Image.open(image_bytes) chat_session = self.start_chat_session(user_id) if not chat_session: raise Exception("Failed to start chat session") response = chat_session.send_message([prompt, image]) # 使用聊天模型分析 reply_text = "" # 包含對聊天模型空回應的檢查 if response.candidates: if response.candidates[0].finish_reason == 'SAFETY': block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知" reply_text = f"喵~分析圖片時好像遇到安全問題 ({block_reason})!" elif response.text: reply_text = response.text else: reply_text = "喵~我看不太懂這張圖片!" else: block_reason_msg = "" if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})" reply_text = f"喵~分析圖片時好像遇到問題了!{block_reason_msg}" line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) should_store_bot_reply = True # 清理狀態 try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]; print(f"Removed image_pending state.") except Exception as remove_err: print(f"Warning: Could not remove image_pending state: {remove_err}") except Exception as e: print(f"圖像處理錯誤: {e}") traceback.print_exc() reply_text = "喵~處理圖片時出了點小狀況!" line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) should_store_bot_reply = True try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info] except: pass if should_store_user_message: self.store_user_message(user_id, "text", prompt) if should_store_bot_reply: self.store_bot_reply(user_id, reply_text) return print("Not an image analysis request.") # --- 步驟 5: RAG 控制命令 --- if isinstance(event.message, TextMessage): message_text = prompt # 使用 strip 過的 prompt # --- 修改: 直接使用在函數開頭定義的 source_id --- # source_id = self._get_source_id_from_event(event) # 不再需要重新獲取 # --- reply_text_rag_cmd = None # 避免與外層 reply_text 衝突 if hasattr(self.rag_manager, 'rag_enable_command') and message_text == self.rag_manager.rag_enable_command: print(f"Enabling RAG for {source_id}") # 使用 source_id self.rag_manager.rag_status[source_id] = True reply_text_rag_cmd = "喵~我已開啟查詢模式!" elif hasattr(self.rag_manager, 'rag_disable_command') and message_text == self.rag_manager.rag_disable_command: print(f"Disabling RAG for {source_id}") # 使用 source_id self.rag_manager.rag_status[source_id] = False reply_text_rag_cmd = "喵~我已關閉查詢模式!" if reply_text_rag_cmd: # 如果是 RAG 命令 line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text_rag_cmd)) self.store_bot_reply(user_id, reply_text_rag_cmd) # 儲存機器人回覆 self.store_user_message(user_id, "text", prompt) # 儲存使用者指令 return # 處理完 RAG 命令後結束 # --- 步驟 6: RAG/API 查詢 --- is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) # 使用 source_id # --- retrieved_info = None if is_rag_enabled: print(f"RAG is enabled for {source_id}. Retrieving info...") # 使用 source_id retrieved_info = self.retrieve_relevant_info(prompt, user_id=user_id) # 傳遞 user_id 給 RAG if retrieved_info: print("Retrieved info:", retrieved_info[:100] + "...") else: print("No relevant info retrieved.") else: print(f"RAG is disabled for {source_id}. Skipping info retrieval.") # 使用 source_id # --- 步驟 7: 準備給 Gemini 的提示 --- prompt_for_gemini = prompt if not prompt_for_gemini: prompt_for_gemini = "喵~" print(f"Final prompt for Gemini: '{prompt_for_gemini}'") # --- 步驟 8: 發送訊息給 Gemini 聊天模型 --- chat_session = self.start_chat_session(user_id) if not chat_session: reply_text = "喵~糟糕,無法開始對話..." line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) if should_store_user_message: self.store_user_message(user_id, "text", prompt) if should_store_bot_reply: self.store_bot_reply(user_id, reply_text) return final_prompt_to_send = prompt_for_gemini if retrieved_info: final_prompt_to_send = self.augment_prompt_with_info(prompt_for_gemini, retrieved_info) reply_text = "" try: print("Sending final prompt to Gemini...") response = chat_session.send_message(final_prompt_to_send) # *** 包含對聊天模型空回應的檢查 *** if response.candidates: if response.candidates[0].finish_reason == 'SAFETY': block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知" reply_text = f"喵~這個話題有點敏感 ({block_reason})!" elif response.text: reply_text = response.text print("Received text response from Gemini:", reply_text[:100] + "...") else: reply_text = "喵~橘橘不知道怎麼回答..." else: block_reason_msg = "" if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})" reply_text = f"喵~請求好像被擋下來了!{block_reason_msg}" except Exception as gemini_error: print(f"Error calling Gemini API: {gemini_error}") traceback.print_exc() reply_text = "喵~糟糕,我的腦袋好像有點打結了..." if not reply_text: reply_text = "喵~橘橘現在有點累..." # --- 步驟 9: 處理最終文字回覆 --- print("Sending final text reply.") try: line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) should_store_bot_reply = True except Exception as final_reply_err: print(f"Error sending final text reply: {final_reply_err}") should_store_bot_reply = False # --- 步驟 10: 儲存機器人回覆和使用者訊息 --- if should_store_bot_reply: self.store_bot_reply(user_id, reply_text) if should_store_user_message: self.store_user_message(user_id, "text", prompt) def handle_image_message(self, event, line_bot_api, line_bot): user_id = event.source.user_id message_id = event.message.id print(f"Handling image message from {user_id}, message ID: {message_id}") try: self.store_user_message(user_id, "image_pending", message_id) line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵!圖片已收到,請告訴我你想知道關於這張圖片的什麼問題呢?")) except Exception as e: print(f"Error storing image message ID: {e}") traceback.print_exc() line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘記錄圖片訊息時出錯了:{e}")) def handle_audio_message(self, event, line_bot_api, line_bot): user_id = event.source.user_id message_id = event.message.id source_id = self._get_source_id_from_event(event) print(f"Handling audio message from {source_id}, message ID: {message_id}") m4a_audio_path = None try: m4a_audio_path = line_bot.get_audio_url(message_id) if not m4a_audio_path: raise Exception("無法取得語音檔案路徑") print(f"Retrieved m4a audio path: {m4a_audio_path}") wav_content = None try: print("Converting M4A to WAV...") audio = AudioSegment.from_file(m4a_audio_path, format="m4a") audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2) wav_buffer = io.BytesIO() audio.export(wav_buffer, format="wav") wav_content = wav_buffer.getvalue() print(f"Audio converted to WAV (LINEAR16) successfully, size: {len(wav_content)} bytes") except Exception as conversion_error: print(f"音訊轉換失敗: {conversion_error}") traceback.print_exc() line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不懂這個語音格式耶!")) return finally: if m4a_audio_path and os.path.exists(m4a_audio_path): try: os.remove(m4a_audio_path); print(f"Removed temporary m4a file: {m4a_audio_path}") except OSError as remove_error: print(f"Error removing temporary m4a file: {remove_error}") text_from_audio = self._convert_audio_bytes_to_text(wav_content) if not text_from_audio: print("STT failed to transcribe audio.") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不清楚你在說什麼,可以再說一次嗎?")) return print(f"STT transcription result: '{text_from_audio}'") self.store_user_message(user_id, "text", text_from_audio) chat_session = self.start_chat_session(user_id) if not chat_session: raise Exception("Chat session failed to start.") is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) retrieved_info = None if is_rag_enabled: retrieved_info = self.retrieve_relevant_info(text_from_audio, user_id=user_id) final_prompt_to_send = text_from_audio if retrieved_info: final_prompt_to_send = self.augment_prompt_with_info(text_from_audio, retrieved_info) reply_text = "" try: print("Sending transcribed text to Gemini...") response = chat_session.send_message(final_prompt_to_send) if response.candidates and response.text: reply_text = response.text else: reply_text = "喵~我好像沒聽懂你的語音訊息!" print("Received Gemini response for audio transcription:", reply_text[:100] + "...") except Exception as gemini_error: print(f"Error calling Gemini API for audio transcription: {gemini_error}") traceback.print_exc() reply_text = "喵~我聽懂你說什麼了,但我的腦袋好像有點打結了!" if not reply_text: reply_text="喵~我沒聽懂你的語音訊息!" line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text)) self.store_bot_reply(user_id, reply_text) except Exception as e: print(f"處理語音訊息時發生未預期錯誤: {e}") traceback.print_exc() if m4a_audio_path and os.path.exists(m4a_audio_path): try: os.remove(m4a_audio_path) except: pass line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘無法處理語音訊息:{e}")) def _convert_audio_bytes_to_text(self, audio_bytes): try: client = speech.SpeechClient() audio = speech.RecognitionAudio(content=audio_bytes) config = speech.RecognitionConfig(encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="cmn-TW", enable_automatic_punctuation=True) print("Sending audio bytes to Google STT API...") response = client.recognize(config=config, audio=audio) print("Received response from Google STT API.") if response.results: transcript = response.results[0].alternatives[0].transcript print(f"STT API Transcription: '{transcript}'") return transcript else: print("STT API returned no results.") return None except Exception as e: print(f"Error calling Google STT API: {e}") traceback.print_exc() return None