gemiline / chatbot.py
motaer0206's picture
Update chatbot.py
a9721a0 verified
import google.generativeai as genai_gen
import json, os
import time
from collections import defaultdict
import io
from PIL import Image
import traceback
from google.cloud import speech, texttospeech, storage
from system_instruction import system_instruction
import requests_handler
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage, AudioMessage, AudioSendMessage
from rag_manager import RAGManager
from Image_generation import ImageGenerator
from pydub import AudioSegment
from pydub.exceptions import CouldntDecodeError # 導入特定錯誤
from typing import Union, Tuple
creds_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
creds_path = "/tmp/google_creds.json"
if creds_json:
with open(creds_path, "w") as f:
f.write(creds_json)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path
class ChatBot:
"""
核心聊天機器人類別:
- 與 Gemini 模型互動
- 處理使用者訊息(文字、圖片、語音)
- 根據使用者查詢決定是否檢索外部資訊(RAG)
- 提供 TTS 語音回覆並上傳到 GCS
"""
# --- 定義圖片生成觸發關鍵字 (類別層級) ---
IMAGE_GENERATION_KEYWORDS = ["生成一張", "畫一張"] # 使用 startswith 匹配
IMAGE_EDIT_KEYWORDS = ["編輯圖片", "幫我改圖", "編輯這張圖"]
def __init__(self, google_api_key, system_instruction, google_search_api_key=None, google_search_cse_id=None):
"""
初始化 ChatBot 實例
"""
self.image_generator = ImageGenerator()
# API Key 預期通過環境變數配置
try:
genai_gen.configure(api_key=google_api_key)
print("Google AI API Key configured successfully via genai_gen.configure().")
except AttributeError:
print("Warning: genai_gen.configure not found. Assuming API key is configured via environment variable.")
except Exception as config_err:
print(f"Error configuring Google AI API Key: {config_err}")
# 設定生成配置
generation_config = genai_gen.types.GenerationConfig(max_output_tokens=2048, temperature=0.5, top_p=0.5, top_k=16)
# 建立並設定 Gemini 聊天模型
self.model_name = "gemini-2.0-flash"
try:
self.model = genai_gen.GenerativeModel(
model_name=self.model_name,
generation_config=generation_config,
system_instruction=system_instruction,
)
print(f"Gemini model '{self.model_name}' initialized.")
except Exception as model_init_err:
print(f"Error initializing Gemini model '{self.model_name}': {model_init_err}")
raise
self.chat_sessions = {}
self.user_message_history = defaultdict(list)
self.bot_reply_history = defaultdict(list)
self.rag_manager = RAGManager(
google_search_api_key=google_search_api_key,
google_search_cse_id=google_search_cse_id
)
print("RAG Manager initialized.")
def start_chat_session(self, user_id):
"""啟動或獲取聊天會話"""
if user_id not in self.chat_sessions:
print(f"Starting new chat session for user {user_id}")
try:
self.chat_sessions[user_id] = self.model.start_chat(history=[])
except Exception as start_chat_err:
print(f"Error starting chat session for {user_id}: {start_chat_err}")
return None
return self.chat_sessions.get(user_id)
# --- determine_query_intent, retrieve_relevant_info, _extract_city_name, augment_prompt_with_info, _get_info_type ---
@staticmethod
def determine_query_intent(user_query):
specific_api_keywords = ["空氣品質", "aqi", "天氣", "地震資訊", "牌告匯率", "今日匯率"]
factual_indicators = ["如何", "什麼是", "解釋", "定義", "告訴我關於", "幫我查", "最新", "新聞", "資訊", "多少", "為什麼", "何時", "哪裡", "誰是", "幾點", "幾月", "怎麼做", "教學", "方法", "步驟"]
conversational_indicators = ["你好", "嗨", "哈囉", "你覺得", "你喜歡", "聊聊", "聊天", "無聊", "我想", "我覺得", "我希望", "我喜歡", "謝謝", "笑話", "有趣", "開心", "難過", "討厭", "害怕", "緊張", "期待"]
user_query_lower = user_query.lower()
if any(keyword in user_query_lower for keyword in specific_api_keywords): return "specific_api"
factual_score = sum(1 for indicator in factual_indicators if indicator in user_query_lower)
conversational_score = sum(1 for indicator in conversational_indicators if indicator in user_query_lower)
return "factual" if factual_score > conversational_score else "conversational"
def retrieve_relevant_info(self, user_query, user_id=None):
intent = ChatBot.determine_query_intent(user_query)
user_query_lower = user_query.lower()
if intent == "specific_api":
if "空氣品質" in user_query_lower or "aqi" in user_query_lower: return requests_handler.get_air_quality()
elif "天氣" in user_query_lower: return requests_handler.get_weather(city_name=self._extract_city_name(user_query_lower))
elif "地震資訊" in user_query_lower: return requests_handler.get_earthquake()
elif "牌告匯率" in user_query_lower or "今日匯率" in user_query_lower: return requests_handler.get_exchange_rate()
source_id = f"user_{user_id}" if user_id else None
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) if source_id else True
if intent == "factual" and is_rag_enabled:
print(f"RAG enabled for {source_id}. Getting web context...")
web_context = self.rag_manager.get_web_context_for_query(user_query, user_id=user_id)
if web_context: return f"從網路搜尋獲得的資訊:\n{web_context}"
else: print("RAG: No web context found.")
elif intent == "factual" and not is_rag_enabled: print(f"RAG disabled for {source_id}.")
return None
def _extract_city_name(self, user_query_lower):
city_keywords = {"臺北市": ["台北", "臺北", "台北市", "臺北市"], "新北市": ["新北", "新北市"], "桃園市": ["桃園", "桃園市"], "臺中市": ["台中", "臺中", "台中市", "臺中市"], "新竹市": ["新竹", "新竹市"]}
for city, keywords in city_keywords.items():
if any(keyword in user_query_lower for keyword in keywords): return city
return None
def augment_prompt_with_info(self, original_prompt, retrieved_info):
if not retrieved_info: return f"""使用者: {original_prompt}\n橘橘: """
base_instruction = "請以橘橘(一隻友善的貓)的身份回答使用者。使用繁體中文,語氣自然活潑,偶爾加入「喵」作為口頭禪。"
if "從網路搜尋獲得的資訊" in retrieved_info:
augmented_prompt = f"""{base_instruction} 請參考以下網路資訊:\n\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:基於參考資料回答,保持對話自然。如果回答直接參考了某來源,請在句末附上網址 (來源: https://...)。若資料不足,請誠實說明。\n橘橘: """
else:
info_type = self._get_info_type(retrieved_info)
augmented_prompt = f"""{base_instruction} 請根據以下{info_type}資訊回答:\n\n{info_type}資訊:\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:融入資訊,保持對話自然。\n橘橘: """
return augmented_prompt
def _get_info_type(self, info):
if isinstance(info, str):
if "空氣品質" in info or "aqi" in info.lower(): return "空氣品質"
if "天氣" in info: return "天氣預報"
if "地震" in info: return "地震資訊"
if "匯率" in info: return "牌告匯率"
if "從網路搜尋獲得的資訊" in info: return "網路搜尋"
return "相關資訊"
def store_user_message(self, user_id, message_type, message_content):
max_history_len = 20
if len(self.user_message_history[user_id]) >= max_history_len: self.user_message_history[user_id].pop(0)
self.user_message_history[user_id].append({"type": message_type, "content": message_content})
print(f"Stored user message for {user_id}: type='{message_type}', content='{str(message_content)[:50]}...'")
def store_bot_reply(self, user_id, reply_text, max_history=5):
if user_id not in self.bot_reply_history: self.bot_reply_history[user_id] = []
if reply_text and isinstance(reply_text, str):
self.bot_reply_history[user_id].append(reply_text)
if len(self.bot_reply_history[user_id]) > max_history: self.bot_reply_history[user_id] = self.bot_reply_history[user_id][-max_history:]
print(f"Stored bot reply for {user_id}: '{reply_text[:50]}...'")
else: print(f"Warning: Attempted to store invalid bot reply for {user_id}: {reply_text}")
def get_last_bot_reply(self, user_id):
if user_id in self.bot_reply_history and self.bot_reply_history[user_id]: return self.bot_reply_history[user_id][-1]
return None
# --- generate_speech_reply (生成 M4A/AAC) ---
def generate_speech_reply(self, text):
"""產生 AAC (M4A) 語音回覆並上傳到 GCS。"""
if not text or not isinstance(text, str):
print("Error: Invalid text provided for speech generation.")
return None, None
try:
tts_client = texttospeech.TextToSpeechClient()
synthesis_input = texttospeech.SynthesisInput(text=text)
voice = texttospeech.VoiceSelectionParams(language_code="cmn-TW", name="cmn-TW-Wavenet-C", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE)
print("Synthesizing speech (WAV)...")
audio_config_wav = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.LINEAR16, sample_rate_hertz=16000)
response_wav = tts_client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config_wav)
wav_content = response_wav.audio_content
print("Speech synthesized as WAV successfully.")
print("Converting WAV to AAC (M4A)...")
try:
audio = AudioSegment.from_wav(io.BytesIO(wav_content))
m4a_buffer = io.BytesIO()
audio.export(m4a_buffer, format="mp4", codec="aac", bitrate="64k")
m4a_content = m4a_buffer.getvalue()
print(f"Audio converted to M4A (AAC) successfully, size: {len(m4a_content)} bytes.")
except Exception as convert_err:
print(f"Error converting WAV to M4A: {convert_err}")
traceback.print_exc()
return None, None
print("Calculating M4A audio duration...")
duration = ChatBot.get_audio_duration_from_bytes(m4a_content, format="m4a") # 使用 m4a 格式計算
if duration is None:
print("Error: Failed to calculate M4A audio duration.")
return None, None
print(f"Calculated M4A duration: {duration} ms")
bucket_name = "stt_bucket_for_allen"
file_extension = "m4a"
content_type = "audio/m4a"
gcs_file_path = f"audio_replies/reply_{int(time.time() * 1000)}.{file_extension}"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(gcs_file_path)
print(f"Uploading audio reply ({content_type}) to gs://{bucket_name}/{gcs_file_path}")
blob.upload_from_string(data=m4a_content, content_type=content_type)
print("Audio uploaded to GCS.")
public_url = blob.public_url
print(f"Generated GCS public URL: {public_url}")
if not public_url or not public_url.startswith("https://"):
print(f"Error or Warning: Invalid public URL generated: {public_url}")
if public_url and public_url.startswith("http://"):
public_url = "https://" + public_url[len("http://"):]
print(f"Corrected URL to HTTPS: {public_url}")
else: return None, None
print(f"已上傳語音檔案到 GCS: {public_url},音訊長度: {duration} ms")
return public_url, duration
except Exception as e:
print(f"產生語音回覆或上傳 GCS 時出錯: {e}")
traceback.print_exc()
return None, None
# --- get_audio_duration_from_bytes (接受 format) ---
@staticmethod
def get_audio_duration_from_bytes(audio_bytes, format="mp3"):
try:
print(f"Calculating duration for format: {format}")
audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=format)
duration_ms = len(audio)
return min(duration_ms, 60000)
except CouldntDecodeError as decode_err:
print(f"Pydub decode error calculating duration (format: {format}): {decode_err}")
traceback.print_exc()
return None
except Exception as e:
print(f"Error calculating audio duration (format: {format}): {e}")
traceback.print_exc()
return None
def is_follow_up_question(self, user_id, current_query):
recent_messages = self.user_message_history[user_id]
if len(recent_messages) < 2: return False
text_messages = [msg for msg in recent_messages if msg.get('type') == 'text']
if len(text_messages) < 2: return False
if len(current_query) < 10: return True
reference_words = ["他", "她", "它", "這個", "那個", "這", "那", "這些", "那些", "以上", "剛剛"]
if any(word in current_query for word in reference_words): return True
return False
def _get_source_id_from_event(self, event):
source_type = event.source.type
if source_type == 'user': return f"user_{event.source.user_id}"
elif source_type == 'group': return f"group_{event.source.group_id}"
elif source_type == 'room': return f"room_{event.source.room_id}"
user_id = getattr(event.source, 'user_id', None)
return f"unknown_{user_id}" if user_id else "unknown_source"
def handle_text_message(self, event, line_bot_api):
"""
處理文字訊息事件:(已整合圖片生成、圖片編輯、圖片分析、RAG、聊天、語音回覆邏輯)
"""
user_id = event.source.user_id
prompt = event.message.text.strip()
source_id = self._get_source_id_from_event(event) # 使用輔助函數獲取 user/group/room ID (帶前綴)
print(f"Handling text message from {source_id}: '{prompt}'") # 使用 source_id 打印
# --- 獲取用於 Push API 的原始 ID ---
push_target_id = None
if event.source.type == 'user': push_target_id = event.source.user_id
elif event.source.type == 'group': push_target_id = event.source.group_id
elif event.source.type == 'room': push_target_id = event.source.room_id
reply_text = ""
should_store_user_message = True
should_store_bot_reply = True
# --- 步驟 1: 檢查是否為圖片生成觸發 (startswith) ---
is_image_generation_request = False
image_description = ""
matched_keyword = None
for keyword in self.IMAGE_GENERATION_KEYWORDS:
if prompt.startswith(keyword):
is_image_generation_request = True
matched_keyword = keyword
image_description = prompt[len(keyword):].strip()
break
if is_image_generation_request:
print(f"Image generation request detected with keyword '{matched_keyword}'.")
if not image_description:
# 情況:只有關鍵字,沒有描述
print("Image description is empty.")
reply_text = "喵~你想畫什麼圖片呢?請在關鍵字後面加上描述喔!例如:「生成圖片 一隻貓在太空漫步」"
should_store_bot_reply = True # 需要儲存這個提示
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
# 不儲存 image_generation_pending_description 狀態,讓使用者直接帶描述觸發
else:
# --- 有描述,執行生成流程 ---
try:
# 先用 reply_message 回覆 "生成中...",避免超時
print("Replying with 'Generating...' message.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!圖片正在努力生成中..."))
except Exception as reply_err:
print(f"Warning: Failed to send initial 'Generating...' reply: {reply_err}")
# 即使回覆失敗,也繼續嘗試生成,後續用 push
print(f"Calling image generator for data with description: '{image_description}'")
# 調用 ImageGenerator 的方法獲取圖片 bytes
image_bytes = self.image_generator.generate_image_with_gemini(image_description) # 假設此方法返回 bytes 或 None
image_url = None # 初始化 URL
if image_bytes:
# 如果成功獲取 bytes,上傳到 GCS
print("Image data received. Uploading to GCS...")
image_url = self.image_generator.upload_image_to_gcs(image_bytes, file_name_prefix=f"gen_{user_id[:8]}")
if not image_url:
print("GCS upload failed.")
else:
# 如果 generate_image_with_gemini 返回 None
print("Image generation failed, no image bytes received.")
# --- 使用 push_message 發送最終結果 (圖片或錯誤) ---
if image_url:
safe_image_url = image_url + "#"
print(f"Image generated and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_image_url}") # 使用 push_target_id 打印
image_message = ImageSendMessage(original_content_url=safe_image_url, preview_image_url=safe_image_url)
final_text_output = "喵~圖片生成好了!"
text_message = TextSendMessage(text=final_text_output)
try:
line_bot_api.push_message(push_target_id, [image_message, text_message])
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final image message to {push_target_id}: {push_err}")
# 嘗試推送錯誤文字到來源
try: line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片好了,但推送時好像有點問題..."))
except: pass # 放棄最後的嘗試
else:
# 失敗 (生成失敗或上傳失敗)
error_msg = "喵~糟糕,圖片生成或上傳失敗了,請稍後再試!"
print(f"Image generation/upload failed. Pushing error message to {push_target_id}.")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg)) # 推送錯誤訊息到來源
should_store_bot_reply = False # 推送錯誤訊息也不存 bot reply
except Exception as push_err:
print(f"Error pushing final error message to {push_target_id}: {push_err}")
# 圖片生成流程結束,儲存使用者訊息
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
# 如果有 reply_text (例如空描述提示),則儲存機器人回覆
if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text)
return # 結束 handle_text_message
# --- 如果不是圖片生成請求,繼續執行後續步驟 ---
print("Not an image generation request.")
# --- 步驟 2: 檢查是否為圖片編輯觸發 ---
is_image_edit_request = False
edit_description = ""
matched_edit_keyword = None
# 使用之前定義的 IMAGE_EDIT_KEYWORDS (假設已在類別中定義)
if hasattr(self, 'IMAGE_EDIT_KEYWORDS'):
for keyword in self.IMAGE_EDIT_KEYWORDS:
if prompt.startswith(keyword):
is_image_edit_request = True
matched_edit_keyword = keyword
edit_description = prompt[len(keyword):].strip()
break
else:
# 如果沒有定義編輯關鍵字,可以選擇打印警告或忽略
# print("Warning: IMAGE_EDIT_KEYWORDS not defined in ChatBot class.")
pass
if is_image_edit_request:
print(f"Image edit request detected with keyword '{matched_edit_keyword}'.")
# 檢查是否有待處理的圖片
recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None)
if not recent_image_info:
# 情況:沒有先上傳圖片
print("No pending image found for editing.")
reply_text = "喵~要編輯哪張圖片呀?請先傳送圖片給我喔!"
should_store_bot_reply = True
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
elif not edit_description:
# 情況:有圖片但沒有編輯描述
print("Edit description is empty.")
reply_text = "喵~你想怎麼編輯這張圖片呢?請告訴我編輯指令喔! (例如:幫我改圖 加上太陽眼鏡)"
should_store_bot_reply = True
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
else:
# --- 有圖片且有描述,執行編輯流程 ---
image_message_id = recent_image_info.get('content')
print(f"Found pending image ID {image_message_id} for editing.")
original_image_bytes = None # 初始化
try:
# 先用 reply_message 回覆 "編輯中..."
print("Replying with 'Editing...' message.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!正在幫你編輯圖片..."))
except Exception as reply_err:
print(f"Warning: Failed to send initial 'Editing...' reply: {reply_err}")
try:
# 即時獲取原始圖片 bytes
print(f"Fetching original image content for message ID: {image_message_id}")
message_content = line_bot_api.get_message_content(image_message_id)
original_image_bytes = message_content.content
print(f"Fetched original image content, size: {len(original_image_bytes)} bytes")
except Exception as fetch_err:
print(f"Error fetching original image for editing: {fetch_err}")
# 如果獲取原始圖片失敗,推送錯誤訊息
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~找不到你要編輯的原始圖片耶..."))
except Exception as push_fetch_err: # 捕捉推送錯誤
print(f"Error pushing fetch error message: {push_fetch_err}")
# 清理狀態並結束
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
return # 結束
edited_image_bytes = None
if original_image_bytes:
print(f"Calling image editor with description: '{edit_description}'")
# 呼叫編輯方法
if hasattr(self.image_generator, 'edit_image_with_gemini'):
edited_image_bytes = self.image_generator.edit_image_with_gemini(original_image_bytes, edit_description)
else:
print("Error: image_generator does not have 'edit_image_with_gemini' method.")
edited_image_url = None
if edited_image_bytes:
print("Edited image data received. Uploading to GCS...")
edited_image_url = self.image_generator.upload_image_to_gcs(edited_image_bytes, file_name_prefix=f"edit_{user_id[:8]}")
if not edited_image_url: print("GCS upload failed for edited image.")
else:
print("Image editing failed, no edited image bytes received.")
# 推送結果
if edited_image_url:
safe_edited_url = edited_image_url + "#"
print(f"Image edited and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_edited_url}")
image_message = ImageSendMessage(original_content_url=safe_edited_url, preview_image_url=safe_edited_url)
final_text_output = "喵~圖片編輯好了!"
text_message = TextSendMessage(text=final_text_output)
try:
line_bot_api.push_message(push_target_id, [image_message, text_message])
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final edited image message to {push_target_id}: {push_err}")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片編輯好了,但推送時好像有點問題..."))
except: pass
else:
error_msg = "喵~糟糕,圖片編輯失敗了,請稍後再試!"
print(f"Image editing/upload failed. Pushing error message to {push_target_id}.")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg))
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final edit error message to {push_target_id}: {push_err}")
# 清理圖片狀態 (無論編輯成功與否)
try:
self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
print("Removed image_pending state after edit attempt.")
except Exception as remove_err:
print(f"Warning: Could not remove image_pending state after edit: {remove_err}")
# 圖片編輯流程結束
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text)
return # 結束 handle_text_message
print("Not an image edit request.")
# --- 步驟 3: 處理語音回覆請求 ---
if prompt == "語音回覆":
print("Voice reply request detected.")
should_store_user_message = True
should_store_bot_reply = False
last_reply = self.get_last_bot_reply(user_id)
reply_to_send = None
if last_reply:
print(f"Generating voice for last bot reply: '{last_reply[:50]}...'")
audio_url, duration = self.generate_speech_reply(last_reply) # 假設此方法已更新為生成 M4A
if audio_url and duration:
try:
safe_audio_url = audio_url + "#" # 加入 '#' 技巧
print(f"Sending audio reply. Safe URL: {safe_audio_url}, Duration: {duration}")
reply_to_send = [
AudioSendMessage(originalContentUrl=safe_audio_url, duration=duration),
TextSendMessage(text=f"這是上一句的回覆語音:\n「{last_reply}」")
]
except Exception as send_err:
print(f"傳送語音訊息時出錯: {send_err}.")
reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(發送錯誤)... 上一句是:\n「{last_reply}」")
should_store_bot_reply = True
else:
print("Failed to generate voice reply.")
reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(生成失敗)... 上一句是:\n「{last_reply}」")
should_store_bot_reply = True
else:
print("No previous bot reply found to generate voice for.")
reply_to_send = TextSendMessage(text="喵~找不到上一句話耶,沒辦法轉成語音...")
should_store_bot_reply = True
try:
if isinstance(reply_to_send, list):
line_bot_api.reply_message(event.reply_token, reply_to_send)
else:
line_bot_api.reply_message(event.reply_token, [reply_to_send])
except Exception as final_reply_err:
print(f"Error sending final voice reply/fallback: {final_reply_err}")
traceback.print_exc()
try: line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~處理語音回覆時出錯了!"))
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply:
if isinstance(reply_to_send, TextSendMessage): self.store_bot_reply(user_id, reply_to_send.text)
elif isinstance(reply_to_send, list) and isinstance(reply_to_send[-1], TextSendMessage): self.store_bot_reply(user_id, reply_to_send[-1].text)
return # 處理完語音請求後結束
# --- 步驟 4: 圖片分析 ---
recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None)
if recent_image_info:
image_message_id = recent_image_info.get('content')
print(f"Found pending image message ID: {image_message_id}...")
try:
message_content = line_bot_api.get_message_content(image_message_id)
image_bytes = io.BytesIO(message_content.content)
image = Image.open(image_bytes)
chat_session = self.start_chat_session(user_id)
if not chat_session: raise Exception("Failed to start chat session")
response = chat_session.send_message([prompt, image]) # 使用聊天模型分析
reply_text = ""
# 包含對聊天模型空回應的檢查
if response.candidates:
if response.candidates[0].finish_reason == 'SAFETY':
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知"
reply_text = f"喵~分析圖片時好像遇到安全問題 ({block_reason})!"
elif response.text:
reply_text = response.text
else:
reply_text = "喵~我看不太懂這張圖片!"
else:
block_reason_msg = ""
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})"
reply_text = f"喵~分析圖片時好像遇到問題了!{block_reason_msg}"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
# 清理狀態
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]; print(f"Removed image_pending state.")
except Exception as remove_err: print(f"Warning: Could not remove image_pending state: {remove_err}")
except Exception as e:
print(f"圖像處理錯誤: {e}")
traceback.print_exc()
reply_text = "喵~處理圖片時出了點小狀況!"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
return
print("Not an image analysis request.")
# --- 步驟 5: RAG 控制命令 ---
if isinstance(event.message, TextMessage):
message_text = prompt # 使用 strip 過的 prompt
# --- 修改: 直接使用在函數開頭定義的 source_id ---
# source_id = self._get_source_id_from_event(event) # 不再需要重新獲取
# ---
reply_text_rag_cmd = None # 避免與外層 reply_text 衝突
if hasattr(self.rag_manager, 'rag_enable_command') and message_text == self.rag_manager.rag_enable_command:
print(f"Enabling RAG for {source_id}") # 使用 source_id
self.rag_manager.rag_status[source_id] = True
reply_text_rag_cmd = "喵~我已開啟查詢模式!"
elif hasattr(self.rag_manager, 'rag_disable_command') and message_text == self.rag_manager.rag_disable_command:
print(f"Disabling RAG for {source_id}") # 使用 source_id
self.rag_manager.rag_status[source_id] = False
reply_text_rag_cmd = "喵~我已關閉查詢模式!"
if reply_text_rag_cmd: # 如果是 RAG 命令
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text_rag_cmd))
self.store_bot_reply(user_id, reply_text_rag_cmd) # 儲存機器人回覆
self.store_user_message(user_id, "text", prompt) # 儲存使用者指令
return # 處理完 RAG 命令後結束
# --- 步驟 6: RAG/API 查詢 ---
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) # 使用 source_id
# ---
retrieved_info = None
if is_rag_enabled:
print(f"RAG is enabled for {source_id}. Retrieving info...") # 使用 source_id
retrieved_info = self.retrieve_relevant_info(prompt, user_id=user_id) # 傳遞 user_id 給 RAG
if retrieved_info: print("Retrieved info:", retrieved_info[:100] + "...")
else: print("No relevant info retrieved.")
else:
print(f"RAG is disabled for {source_id}. Skipping info retrieval.") # 使用 source_id
# --- 步驟 7: 準備給 Gemini 的提示 ---
prompt_for_gemini = prompt
if not prompt_for_gemini: prompt_for_gemini = "喵~"
print(f"Final prompt for Gemini: '{prompt_for_gemini}'")
# --- 步驟 8: 發送訊息給 Gemini 聊天模型 ---
chat_session = self.start_chat_session(user_id)
if not chat_session:
reply_text = "喵~糟糕,無法開始對話..."
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
return
final_prompt_to_send = prompt_for_gemini
if retrieved_info:
final_prompt_to_send = self.augment_prompt_with_info(prompt_for_gemini, retrieved_info)
reply_text = ""
try:
print("Sending final prompt to Gemini...")
response = chat_session.send_message(final_prompt_to_send)
# *** 包含對聊天模型空回應的檢查 ***
if response.candidates:
if response.candidates[0].finish_reason == 'SAFETY':
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知"
reply_text = f"喵~這個話題有點敏感 ({block_reason})!"
elif response.text:
reply_text = response.text
print("Received text response from Gemini:", reply_text[:100] + "...")
else:
reply_text = "喵~橘橘不知道怎麼回答..."
else:
block_reason_msg = ""
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})"
reply_text = f"喵~請求好像被擋下來了!{block_reason_msg}"
except Exception as gemini_error:
print(f"Error calling Gemini API: {gemini_error}")
traceback.print_exc()
reply_text = "喵~糟糕,我的腦袋好像有點打結了..."
if not reply_text:
reply_text = "喵~橘橘現在有點累..."
# --- 步驟 9: 處理最終文字回覆 ---
print("Sending final text reply.")
try:
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
except Exception as final_reply_err:
print(f"Error sending final text reply: {final_reply_err}")
should_store_bot_reply = False
# --- 步驟 10: 儲存機器人回覆和使用者訊息 ---
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
def handle_image_message(self, event, line_bot_api, line_bot):
user_id = event.source.user_id
message_id = event.message.id
print(f"Handling image message from {user_id}, message ID: {message_id}")
try:
self.store_user_message(user_id, "image_pending", message_id)
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵!圖片已收到,請告訴我你想知道關於這張圖片的什麼問題呢?"))
except Exception as e:
print(f"Error storing image message ID: {e}")
traceback.print_exc()
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘記錄圖片訊息時出錯了:{e}"))
def handle_audio_message(self, event, line_bot_api, line_bot):
user_id = event.source.user_id
message_id = event.message.id
source_id = self._get_source_id_from_event(event)
print(f"Handling audio message from {source_id}, message ID: {message_id}")
m4a_audio_path = None
try:
m4a_audio_path = line_bot.get_audio_url(message_id)
if not m4a_audio_path: raise Exception("無法取得語音檔案路徑")
print(f"Retrieved m4a audio path: {m4a_audio_path}")
wav_content = None
try:
print("Converting M4A to WAV...")
audio = AudioSegment.from_file(m4a_audio_path, format="m4a")
audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2)
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_content = wav_buffer.getvalue()
print(f"Audio converted to WAV (LINEAR16) successfully, size: {len(wav_content)} bytes")
except Exception as conversion_error:
print(f"音訊轉換失敗: {conversion_error}")
traceback.print_exc()
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不懂這個語音格式耶!"))
return
finally:
if m4a_audio_path and os.path.exists(m4a_audio_path):
try: os.remove(m4a_audio_path); print(f"Removed temporary m4a file: {m4a_audio_path}")
except OSError as remove_error: print(f"Error removing temporary m4a file: {remove_error}")
text_from_audio = self._convert_audio_bytes_to_text(wav_content)
if not text_from_audio:
print("STT failed to transcribe audio.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不清楚你在說什麼,可以再說一次嗎?"))
return
print(f"STT transcription result: '{text_from_audio}'")
self.store_user_message(user_id, "text", text_from_audio)
chat_session = self.start_chat_session(user_id)
if not chat_session: raise Exception("Chat session failed to start.")
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True)
retrieved_info = None
if is_rag_enabled:
retrieved_info = self.retrieve_relevant_info(text_from_audio, user_id=user_id)
final_prompt_to_send = text_from_audio
if retrieved_info:
final_prompt_to_send = self.augment_prompt_with_info(text_from_audio, retrieved_info)
reply_text = ""
try:
print("Sending transcribed text to Gemini...")
response = chat_session.send_message(final_prompt_to_send)
if response.candidates and response.text:
reply_text = response.text
else:
reply_text = "喵~我好像沒聽懂你的語音訊息!"
print("Received Gemini response for audio transcription:", reply_text[:100] + "...")
except Exception as gemini_error:
print(f"Error calling Gemini API for audio transcription: {gemini_error}")
traceback.print_exc()
reply_text = "喵~我聽懂你說什麼了,但我的腦袋好像有點打結了!"
if not reply_text: reply_text="喵~我沒聽懂你的語音訊息!"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
self.store_bot_reply(user_id, reply_text)
except Exception as e:
print(f"處理語音訊息時發生未預期錯誤: {e}")
traceback.print_exc()
if m4a_audio_path and os.path.exists(m4a_audio_path):
try: os.remove(m4a_audio_path)
except: pass
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘無法處理語音訊息:{e}"))
def _convert_audio_bytes_to_text(self, audio_bytes):
try:
client = speech.SpeechClient()
audio = speech.RecognitionAudio(content=audio_bytes)
config = speech.RecognitionConfig(encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="cmn-TW", enable_automatic_punctuation=True)
print("Sending audio bytes to Google STT API...")
response = client.recognize(config=config, audio=audio)
print("Received response from Google STT API.")
if response.results:
transcript = response.results[0].alternatives[0].transcript
print(f"STT API Transcription: '{transcript}'")
return transcript
else:
print("STT API returned no results.")
return None
except Exception as e:
print(f"Error calling Google STT API: {e}")
traceback.print_exc()
return None