Spaces:
Running
Running
File size: 45,206 Bytes
8376b2f 1a9cb90 b48de8d 201b894 e390899 201b894 9bf9fa4 b48de8d 8376b2f e32861d 433bd6e 8376b2f 4c95577 8376b2f b48de8d 8dd24d9 8376b2f 8dd24d9 8376b2f 8dd24d9 b48de8d 8376b2f b48de8d 8376b2f b48de8d 8376b2f b48de8d 8376b2f 9bba2a6 1348020 9bba2a6 1348020 9bba2a6 8376b2f 433bd6e 9bba2a6 a9721a0 433bd6e 1348020 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f b48de8d 433bd6e 9bba2a6 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f b48de8d 6262f73 4c95577 8376b2f b06c117 b48de8d 433bd6e 8376b2f 433bd6e 8376b2f 9bba2a6 8376b2f b48de8d 8376b2f b48de8d 8376b2f 8cbb20d 8376b2f 433bd6e 8376b2f b48de8d 8376b2f b48de8d 433bd6e 9bba2a6 b48de8d 433bd6e 9bba2a6 433bd6e 9bba2a6 433bd6e 9bba2a6 b48de8d 433bd6e 9bba2a6 433bd6e b48de8d 9bba2a6 433bd6e 9bba2a6 8376b2f b48de8d 4c95577 781f13d d5b6df8 8376b2f d5b6df8 9bba2a6 d5b6df8 8376b2f d5b6df8 8376b2f d5b6df8 8376b2f d5b6df8 8376b2f d5b6df8 8376b2f d5b6df8 8376b2f 4f6d8ae 9bba2a6 4f6d8ae 8376b2f 4c95577 9bba2a6 d5b6df8 8376b2f 9bba2a6 8376b2f 4c95577 8376b2f 433bd6e 9bba2a6 4c95577 9bba2a6 433bd6e d5b6df8 9bba2a6 433bd6e 8376b2f b48de8d 433bd6e b48de8d 433bd6e b48de8d 7903dea 8376b2f 433bd6e 7903dea 433bd6e 7903dea 49ad538 5261c27 5edf292 86ec3ae 8376b2f 5261c27 86ec3ae f7cadab 8376b2f 5261c27 4db19da 4da26f3 4db19da 8376b2f 5261c27 4db19da 4da26f3 4db19da 8376b2f 433bd6e 8376b2f 9cc5ce5 8376b2f 5261c27 8376b2f 5261c27 8376b2f 5261c27 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 5261c27 8376b2f 433bd6e 86ec3ae 8376b2f 9bba2a6 8376b2f 433bd6e 8376b2f 5261c27 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 5261c27 e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af c04fa48 e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 86ec3ae 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 8376b2f e2fa4af 433bd6e 9bba2a6 433bd6e 9bba2a6 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 9bba2a6 5261c27 8376b2f 5261c27 8376b2f 5261c27 8376b2f 5261c27 8376b2f 5261c27 8376b2f 5261c27 8376b2f 5261c27 8376b2f 433bd6e 9bba2a6 8376b2f 5261c27 8376b2f e2fa4af 5a26fbf 201b894 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 9bba2a6 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f 9bba2a6 433bd6e 9bba2a6 433bd6e 8376b2f 433bd6e 8376b2f 433bd6e 8376b2f e2fa4af 201b894 8376b2f 201b894 8376b2f 201b894 8376b2f d465e5a 8376b2f 433bd6e e2fa4af 8376b2f 201b894 8376b2f e2fa4af 8376b2f 4db19da e2fa4af 433bd6e 201b894 e2fa4af 4c95577 433bd6e 5a26fbf 4c95577 201b894 8376b2f 9bba2a6 201b894 4da26f3 4db19da 8376b2f 5a26fbf 433bd6e 5a26fbf 433bd6e 8376b2f 5a26fbf 8376b2f 433bd6e 201b894 8376b2f 433bd6e 8376b2f 201b894 e2fa4af 433bd6e 9bba2a6 433bd6e 9bba2a6 201b894 e2fa4af 433bd6e b48de8d 433bd6e 201b894 433bd6e 201b894 8376b2f 433bd6e 783b0de b48de8d 201b894 433bd6e b48de8d 201b894 433bd6e 201b894 433bd6e 201b894 433bd6e 201b894 8376b2f 201b894 8376b2f 201b894 433bd6e 201b894 433bd6e b48de8d 201b894 8376b2f b48de8d 201b894 9bba2a6 201b894 b48de8d 433bd6e 8376b2f 201b894 8376b2f 201b894 8376b2f 201b894 433bd6e 201b894 433bd6e 201b894 8376b2f 433bd6e 201b894 8376b2f 9bba2a6 201b894 b48de8d 9bba2a6 201b894 b48de8d 8376b2f 433bd6e b48de8d 201b894 b48de8d 201b894 9bba2a6 201b894 b48de8d 201b894 b48de8d 201b894 8376b2f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 |
import google.generativeai as genai_gen
import json, os
import time
from collections import defaultdict
import io
from PIL import Image
import traceback
from google.cloud import speech, texttospeech, storage
from system_instruction import system_instruction
import requests_handler
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, ImageMessage, AudioMessage, AudioSendMessage
from rag_manager import RAGManager
from Image_generation import ImageGenerator
from pydub import AudioSegment
from pydub.exceptions import CouldntDecodeError # 導入特定錯誤
from typing import Union, Tuple
creds_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
creds_path = "/tmp/google_creds.json"
if creds_json:
with open(creds_path, "w") as f:
f.write(creds_json)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path
class ChatBot:
"""
核心聊天機器人類別:
- 與 Gemini 模型互動
- 處理使用者訊息(文字、圖片、語音)
- 根據使用者查詢決定是否檢索外部資訊(RAG)
- 提供 TTS 語音回覆並上傳到 GCS
"""
# --- 定義圖片生成觸發關鍵字 (類別層級) ---
IMAGE_GENERATION_KEYWORDS = ["生成一張", "畫一張"] # 使用 startswith 匹配
IMAGE_EDIT_KEYWORDS = ["編輯圖片", "幫我改圖", "編輯這張圖"]
def __init__(self, google_api_key, system_instruction, google_search_api_key=None, google_search_cse_id=None):
"""
初始化 ChatBot 實例
"""
self.image_generator = ImageGenerator()
# API Key 預期通過環境變數配置
try:
genai_gen.configure(api_key=google_api_key)
print("Google AI API Key configured successfully via genai_gen.configure().")
except AttributeError:
print("Warning: genai_gen.configure not found. Assuming API key is configured via environment variable.")
except Exception as config_err:
print(f"Error configuring Google AI API Key: {config_err}")
# 設定生成配置
generation_config = genai_gen.types.GenerationConfig(max_output_tokens=2048, temperature=0.5, top_p=0.5, top_k=16)
# 建立並設定 Gemini 聊天模型
self.model_name = "gemini-2.0-flash"
try:
self.model = genai_gen.GenerativeModel(
model_name=self.model_name,
generation_config=generation_config,
system_instruction=system_instruction,
)
print(f"Gemini model '{self.model_name}' initialized.")
except Exception as model_init_err:
print(f"Error initializing Gemini model '{self.model_name}': {model_init_err}")
raise
self.chat_sessions = {}
self.user_message_history = defaultdict(list)
self.bot_reply_history = defaultdict(list)
self.rag_manager = RAGManager(
google_search_api_key=google_search_api_key,
google_search_cse_id=google_search_cse_id
)
print("RAG Manager initialized.")
def start_chat_session(self, user_id):
"""啟動或獲取聊天會話"""
if user_id not in self.chat_sessions:
print(f"Starting new chat session for user {user_id}")
try:
self.chat_sessions[user_id] = self.model.start_chat(history=[])
except Exception as start_chat_err:
print(f"Error starting chat session for {user_id}: {start_chat_err}")
return None
return self.chat_sessions.get(user_id)
# --- determine_query_intent, retrieve_relevant_info, _extract_city_name, augment_prompt_with_info, _get_info_type ---
@staticmethod
def determine_query_intent(user_query):
specific_api_keywords = ["空氣品質", "aqi", "天氣", "地震資訊", "牌告匯率", "今日匯率"]
factual_indicators = ["如何", "什麼是", "解釋", "定義", "告訴我關於", "幫我查", "最新", "新聞", "資訊", "多少", "為什麼", "何時", "哪裡", "誰是", "幾點", "幾月", "怎麼做", "教學", "方法", "步驟"]
conversational_indicators = ["你好", "嗨", "哈囉", "你覺得", "你喜歡", "聊聊", "聊天", "無聊", "我想", "我覺得", "我希望", "我喜歡", "謝謝", "笑話", "有趣", "開心", "難過", "討厭", "害怕", "緊張", "期待"]
user_query_lower = user_query.lower()
if any(keyword in user_query_lower for keyword in specific_api_keywords): return "specific_api"
factual_score = sum(1 for indicator in factual_indicators if indicator in user_query_lower)
conversational_score = sum(1 for indicator in conversational_indicators if indicator in user_query_lower)
return "factual" if factual_score > conversational_score else "conversational"
def retrieve_relevant_info(self, user_query, user_id=None):
intent = ChatBot.determine_query_intent(user_query)
user_query_lower = user_query.lower()
if intent == "specific_api":
if "空氣品質" in user_query_lower or "aqi" in user_query_lower: return requests_handler.get_air_quality()
elif "天氣" in user_query_lower: return requests_handler.get_weather(city_name=self._extract_city_name(user_query_lower))
elif "地震資訊" in user_query_lower: return requests_handler.get_earthquake()
elif "牌告匯率" in user_query_lower or "今日匯率" in user_query_lower: return requests_handler.get_exchange_rate()
source_id = f"user_{user_id}" if user_id else None
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) if source_id else True
if intent == "factual" and is_rag_enabled:
print(f"RAG enabled for {source_id}. Getting web context...")
web_context = self.rag_manager.get_web_context_for_query(user_query, user_id=user_id)
if web_context: return f"從網路搜尋獲得的資訊:\n{web_context}"
else: print("RAG: No web context found.")
elif intent == "factual" and not is_rag_enabled: print(f"RAG disabled for {source_id}.")
return None
def _extract_city_name(self, user_query_lower):
city_keywords = {"臺北市": ["台北", "臺北", "台北市", "臺北市"], "新北市": ["新北", "新北市"], "桃園市": ["桃園", "桃園市"], "臺中市": ["台中", "臺中", "台中市", "臺中市"], "新竹市": ["新竹", "新竹市"]}
for city, keywords in city_keywords.items():
if any(keyword in user_query_lower for keyword in keywords): return city
return None
def augment_prompt_with_info(self, original_prompt, retrieved_info):
if not retrieved_info: return f"""使用者: {original_prompt}\n橘橘: """
base_instruction = "請以橘橘(一隻友善的貓)的身份回答使用者。使用繁體中文,語氣自然活潑,偶爾加入「喵」作為口頭禪。"
if "從網路搜尋獲得的資訊" in retrieved_info:
augmented_prompt = f"""{base_instruction} 請參考以下網路資訊:\n\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:基於參考資料回答,保持對話自然。如果回答直接參考了某來源,請在句末附上網址 (來源: https://...)。若資料不足,請誠實說明。\n橘橘: """
else:
info_type = self._get_info_type(retrieved_info)
augmented_prompt = f"""{base_instruction} 請根據以下{info_type}資訊回答:\n\n{info_type}資訊:\n{retrieved_info}\n\n使用者問題:{original_prompt}\n\n回答要求:融入資訊,保持對話自然。\n橘橘: """
return augmented_prompt
def _get_info_type(self, info):
if isinstance(info, str):
if "空氣品質" in info or "aqi" in info.lower(): return "空氣品質"
if "天氣" in info: return "天氣預報"
if "地震" in info: return "地震資訊"
if "匯率" in info: return "牌告匯率"
if "從網路搜尋獲得的資訊" in info: return "網路搜尋"
return "相關資訊"
def store_user_message(self, user_id, message_type, message_content):
max_history_len = 20
if len(self.user_message_history[user_id]) >= max_history_len: self.user_message_history[user_id].pop(0)
self.user_message_history[user_id].append({"type": message_type, "content": message_content})
print(f"Stored user message for {user_id}: type='{message_type}', content='{str(message_content)[:50]}...'")
def store_bot_reply(self, user_id, reply_text, max_history=5):
if user_id not in self.bot_reply_history: self.bot_reply_history[user_id] = []
if reply_text and isinstance(reply_text, str):
self.bot_reply_history[user_id].append(reply_text)
if len(self.bot_reply_history[user_id]) > max_history: self.bot_reply_history[user_id] = self.bot_reply_history[user_id][-max_history:]
print(f"Stored bot reply for {user_id}: '{reply_text[:50]}...'")
else: print(f"Warning: Attempted to store invalid bot reply for {user_id}: {reply_text}")
def get_last_bot_reply(self, user_id):
if user_id in self.bot_reply_history and self.bot_reply_history[user_id]: return self.bot_reply_history[user_id][-1]
return None
# --- generate_speech_reply (生成 M4A/AAC) ---
def generate_speech_reply(self, text):
"""產生 AAC (M4A) 語音回覆並上傳到 GCS。"""
if not text or not isinstance(text, str):
print("Error: Invalid text provided for speech generation.")
return None, None
try:
tts_client = texttospeech.TextToSpeechClient()
synthesis_input = texttospeech.SynthesisInput(text=text)
voice = texttospeech.VoiceSelectionParams(language_code="cmn-TW", name="cmn-TW-Wavenet-C", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE)
print("Synthesizing speech (WAV)...")
audio_config_wav = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.LINEAR16, sample_rate_hertz=16000)
response_wav = tts_client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config_wav)
wav_content = response_wav.audio_content
print("Speech synthesized as WAV successfully.")
print("Converting WAV to AAC (M4A)...")
try:
audio = AudioSegment.from_wav(io.BytesIO(wav_content))
m4a_buffer = io.BytesIO()
audio.export(m4a_buffer, format="mp4", codec="aac", bitrate="64k")
m4a_content = m4a_buffer.getvalue()
print(f"Audio converted to M4A (AAC) successfully, size: {len(m4a_content)} bytes.")
except Exception as convert_err:
print(f"Error converting WAV to M4A: {convert_err}")
traceback.print_exc()
return None, None
print("Calculating M4A audio duration...")
duration = ChatBot.get_audio_duration_from_bytes(m4a_content, format="m4a") # 使用 m4a 格式計算
if duration is None:
print("Error: Failed to calculate M4A audio duration.")
return None, None
print(f"Calculated M4A duration: {duration} ms")
bucket_name = "stt_bucket_for_allen"
file_extension = "m4a"
content_type = "audio/m4a"
gcs_file_path = f"audio_replies/reply_{int(time.time() * 1000)}.{file_extension}"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(gcs_file_path)
print(f"Uploading audio reply ({content_type}) to gs://{bucket_name}/{gcs_file_path}")
blob.upload_from_string(data=m4a_content, content_type=content_type)
print("Audio uploaded to GCS.")
public_url = blob.public_url
print(f"Generated GCS public URL: {public_url}")
if not public_url or not public_url.startswith("https://"):
print(f"Error or Warning: Invalid public URL generated: {public_url}")
if public_url and public_url.startswith("http://"):
public_url = "https://" + public_url[len("http://"):]
print(f"Corrected URL to HTTPS: {public_url}")
else: return None, None
print(f"已上傳語音檔案到 GCS: {public_url},音訊長度: {duration} ms")
return public_url, duration
except Exception as e:
print(f"產生語音回覆或上傳 GCS 時出錯: {e}")
traceback.print_exc()
return None, None
# --- get_audio_duration_from_bytes (接受 format) ---
@staticmethod
def get_audio_duration_from_bytes(audio_bytes, format="mp3"):
try:
print(f"Calculating duration for format: {format}")
audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=format)
duration_ms = len(audio)
return min(duration_ms, 60000)
except CouldntDecodeError as decode_err:
print(f"Pydub decode error calculating duration (format: {format}): {decode_err}")
traceback.print_exc()
return None
except Exception as e:
print(f"Error calculating audio duration (format: {format}): {e}")
traceback.print_exc()
return None
def is_follow_up_question(self, user_id, current_query):
recent_messages = self.user_message_history[user_id]
if len(recent_messages) < 2: return False
text_messages = [msg for msg in recent_messages if msg.get('type') == 'text']
if len(text_messages) < 2: return False
if len(current_query) < 10: return True
reference_words = ["他", "她", "它", "這個", "那個", "這", "那", "這些", "那些", "以上", "剛剛"]
if any(word in current_query for word in reference_words): return True
return False
def _get_source_id_from_event(self, event):
source_type = event.source.type
if source_type == 'user': return f"user_{event.source.user_id}"
elif source_type == 'group': return f"group_{event.source.group_id}"
elif source_type == 'room': return f"room_{event.source.room_id}"
user_id = getattr(event.source, 'user_id', None)
return f"unknown_{user_id}" if user_id else "unknown_source"
def handle_text_message(self, event, line_bot_api):
"""
處理文字訊息事件:(已整合圖片生成、圖片編輯、圖片分析、RAG、聊天、語音回覆邏輯)
"""
user_id = event.source.user_id
prompt = event.message.text.strip()
source_id = self._get_source_id_from_event(event) # 使用輔助函數獲取 user/group/room ID (帶前綴)
print(f"Handling text message from {source_id}: '{prompt}'") # 使用 source_id 打印
# --- 獲取用於 Push API 的原始 ID ---
push_target_id = None
if event.source.type == 'user': push_target_id = event.source.user_id
elif event.source.type == 'group': push_target_id = event.source.group_id
elif event.source.type == 'room': push_target_id = event.source.room_id
reply_text = ""
should_store_user_message = True
should_store_bot_reply = True
# --- 步驟 1: 檢查是否為圖片生成觸發 (startswith) ---
is_image_generation_request = False
image_description = ""
matched_keyword = None
for keyword in self.IMAGE_GENERATION_KEYWORDS:
if prompt.startswith(keyword):
is_image_generation_request = True
matched_keyword = keyword
image_description = prompt[len(keyword):].strip()
break
if is_image_generation_request:
print(f"Image generation request detected with keyword '{matched_keyword}'.")
if not image_description:
# 情況:只有關鍵字,沒有描述
print("Image description is empty.")
reply_text = "喵~你想畫什麼圖片呢?請在關鍵字後面加上描述喔!例如:「生成圖片 一隻貓在太空漫步」"
should_store_bot_reply = True # 需要儲存這個提示
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
# 不儲存 image_generation_pending_description 狀態,讓使用者直接帶描述觸發
else:
# --- 有描述,執行生成流程 ---
try:
# 先用 reply_message 回覆 "生成中...",避免超時
print("Replying with 'Generating...' message.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!圖片正在努力生成中..."))
except Exception as reply_err:
print(f"Warning: Failed to send initial 'Generating...' reply: {reply_err}")
# 即使回覆失敗,也繼續嘗試生成,後續用 push
print(f"Calling image generator for data with description: '{image_description}'")
# 調用 ImageGenerator 的方法獲取圖片 bytes
image_bytes = self.image_generator.generate_image_with_gemini(image_description) # 假設此方法返回 bytes 或 None
image_url = None # 初始化 URL
if image_bytes:
# 如果成功獲取 bytes,上傳到 GCS
print("Image data received. Uploading to GCS...")
image_url = self.image_generator.upload_image_to_gcs(image_bytes, file_name_prefix=f"gen_{user_id[:8]}")
if not image_url:
print("GCS upload failed.")
else:
# 如果 generate_image_with_gemini 返回 None
print("Image generation failed, no image bytes received.")
# --- 使用 push_message 發送最終結果 (圖片或錯誤) ---
if image_url:
safe_image_url = image_url + "#"
print(f"Image generated and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_image_url}") # 使用 push_target_id 打印
image_message = ImageSendMessage(original_content_url=safe_image_url, preview_image_url=safe_image_url)
final_text_output = "喵~圖片生成好了!"
text_message = TextSendMessage(text=final_text_output)
try:
line_bot_api.push_message(push_target_id, [image_message, text_message])
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final image message to {push_target_id}: {push_err}")
# 嘗試推送錯誤文字到來源
try: line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片好了,但推送時好像有點問題..."))
except: pass # 放棄最後的嘗試
else:
# 失敗 (生成失敗或上傳失敗)
error_msg = "喵~糟糕,圖片生成或上傳失敗了,請稍後再試!"
print(f"Image generation/upload failed. Pushing error message to {push_target_id}.")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg)) # 推送錯誤訊息到來源
should_store_bot_reply = False # 推送錯誤訊息也不存 bot reply
except Exception as push_err:
print(f"Error pushing final error message to {push_target_id}: {push_err}")
# 圖片生成流程結束,儲存使用者訊息
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
# 如果有 reply_text (例如空描述提示),則儲存機器人回覆
if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text)
return # 結束 handle_text_message
# --- 如果不是圖片生成請求,繼續執行後續步驟 ---
print("Not an image generation request.")
# --- 步驟 2: 檢查是否為圖片編輯觸發 ---
is_image_edit_request = False
edit_description = ""
matched_edit_keyword = None
# 使用之前定義的 IMAGE_EDIT_KEYWORDS (假設已在類別中定義)
if hasattr(self, 'IMAGE_EDIT_KEYWORDS'):
for keyword in self.IMAGE_EDIT_KEYWORDS:
if prompt.startswith(keyword):
is_image_edit_request = True
matched_edit_keyword = keyword
edit_description = prompt[len(keyword):].strip()
break
else:
# 如果沒有定義編輯關鍵字,可以選擇打印警告或忽略
# print("Warning: IMAGE_EDIT_KEYWORDS not defined in ChatBot class.")
pass
if is_image_edit_request:
print(f"Image edit request detected with keyword '{matched_edit_keyword}'.")
# 檢查是否有待處理的圖片
recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None)
if not recent_image_info:
# 情況:沒有先上傳圖片
print("No pending image found for editing.")
reply_text = "喵~要編輯哪張圖片呀?請先傳送圖片給我喔!"
should_store_bot_reply = True
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
elif not edit_description:
# 情況:有圖片但沒有編輯描述
print("Edit description is empty.")
reply_text = "喵~你想怎麼編輯這張圖片呢?請告訴我編輯指令喔! (例如:幫我改圖 加上太陽眼鏡)"
should_store_bot_reply = True
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
else:
# --- 有圖片且有描述,執行編輯流程 ---
image_message_id = recent_image_info.get('content')
print(f"Found pending image ID {image_message_id} for editing.")
original_image_bytes = None # 初始化
try:
# 先用 reply_message 回覆 "編輯中..."
print("Replying with 'Editing...' message.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~收到!正在幫你編輯圖片..."))
except Exception as reply_err:
print(f"Warning: Failed to send initial 'Editing...' reply: {reply_err}")
try:
# 即時獲取原始圖片 bytes
print(f"Fetching original image content for message ID: {image_message_id}")
message_content = line_bot_api.get_message_content(image_message_id)
original_image_bytes = message_content.content
print(f"Fetched original image content, size: {len(original_image_bytes)} bytes")
except Exception as fetch_err:
print(f"Error fetching original image for editing: {fetch_err}")
# 如果獲取原始圖片失敗,推送錯誤訊息
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~找不到你要編輯的原始圖片耶..."))
except Exception as push_fetch_err: # 捕捉推送錯誤
print(f"Error pushing fetch error message: {push_fetch_err}")
# 清理狀態並結束
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
return # 結束
edited_image_bytes = None
if original_image_bytes:
print(f"Calling image editor with description: '{edit_description}'")
# 呼叫編輯方法
if hasattr(self.image_generator, 'edit_image_with_gemini'):
edited_image_bytes = self.image_generator.edit_image_with_gemini(original_image_bytes, edit_description)
else:
print("Error: image_generator does not have 'edit_image_with_gemini' method.")
edited_image_url = None
if edited_image_bytes:
print("Edited image data received. Uploading to GCS...")
edited_image_url = self.image_generator.upload_image_to_gcs(edited_image_bytes, file_name_prefix=f"edit_{user_id[:8]}")
if not edited_image_url: print("GCS upload failed for edited image.")
else:
print("Image editing failed, no edited image bytes received.")
# 推送結果
if edited_image_url:
safe_edited_url = edited_image_url + "#"
print(f"Image edited and uploaded. Pushing result to {push_target_id}. Safe URL: {safe_edited_url}")
image_message = ImageSendMessage(original_content_url=safe_edited_url, preview_image_url=safe_edited_url)
final_text_output = "喵~圖片編輯好了!"
text_message = TextSendMessage(text=final_text_output)
try:
line_bot_api.push_message(push_target_id, [image_message, text_message])
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final edited image message to {push_target_id}: {push_err}")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text="喵~圖片編輯好了,但推送時好像有點問題..."))
except: pass
else:
error_msg = "喵~糟糕,圖片編輯失敗了,請稍後再試!"
print(f"Image editing/upload failed. Pushing error message to {push_target_id}.")
try:
line_bot_api.push_message(push_target_id, TextSendMessage(text=error_msg))
should_store_bot_reply = False
except Exception as push_err:
print(f"Error pushing final edit error message to {push_target_id}: {push_err}")
# 清理圖片狀態 (無論編輯成功與否)
try:
self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
print("Removed image_pending state after edit attempt.")
except Exception as remove_err:
print(f"Warning: Could not remove image_pending state after edit: {remove_err}")
# 圖片編輯流程結束
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply and reply_text: self.store_bot_reply(user_id, reply_text)
return # 結束 handle_text_message
print("Not an image edit request.")
# --- 步驟 3: 處理語音回覆請求 ---
if prompt == "語音回覆":
print("Voice reply request detected.")
should_store_user_message = True
should_store_bot_reply = False
last_reply = self.get_last_bot_reply(user_id)
reply_to_send = None
if last_reply:
print(f"Generating voice for last bot reply: '{last_reply[:50]}...'")
audio_url, duration = self.generate_speech_reply(last_reply) # 假設此方法已更新為生成 M4A
if audio_url and duration:
try:
safe_audio_url = audio_url + "#" # 加入 '#' 技巧
print(f"Sending audio reply. Safe URL: {safe_audio_url}, Duration: {duration}")
reply_to_send = [
AudioSendMessage(originalContentUrl=safe_audio_url, duration=duration),
TextSendMessage(text=f"這是上一句的回覆語音:\n「{last_reply}」")
]
except Exception as send_err:
print(f"傳送語音訊息時出錯: {send_err}.")
reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(發送錯誤)... 上一句是:\n「{last_reply}」")
should_store_bot_reply = True
else:
print("Failed to generate voice reply.")
reply_to_send = TextSendMessage(text=f"喵~語音轉換失敗了(生成失敗)... 上一句是:\n「{last_reply}」")
should_store_bot_reply = True
else:
print("No previous bot reply found to generate voice for.")
reply_to_send = TextSendMessage(text="喵~找不到上一句話耶,沒辦法轉成語音...")
should_store_bot_reply = True
try:
if isinstance(reply_to_send, list):
line_bot_api.reply_message(event.reply_token, reply_to_send)
else:
line_bot_api.reply_message(event.reply_token, [reply_to_send])
except Exception as final_reply_err:
print(f"Error sending final voice reply/fallback: {final_reply_err}")
traceback.print_exc()
try: line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~處理語音回覆時出錯了!"))
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply:
if isinstance(reply_to_send, TextSendMessage): self.store_bot_reply(user_id, reply_to_send.text)
elif isinstance(reply_to_send, list) and isinstance(reply_to_send[-1], TextSendMessage): self.store_bot_reply(user_id, reply_to_send[-1].text)
return # 處理完語音請求後結束
# --- 步驟 4: 圖片分析 ---
recent_image_info = next((msg for msg in reversed(self.user_message_history[user_id]) if msg.get('type') == 'image_pending'), None)
if recent_image_info:
image_message_id = recent_image_info.get('content')
print(f"Found pending image message ID: {image_message_id}...")
try:
message_content = line_bot_api.get_message_content(image_message_id)
image_bytes = io.BytesIO(message_content.content)
image = Image.open(image_bytes)
chat_session = self.start_chat_session(user_id)
if not chat_session: raise Exception("Failed to start chat session")
response = chat_session.send_message([prompt, image]) # 使用聊天模型分析
reply_text = ""
# 包含對聊天模型空回應的檢查
if response.candidates:
if response.candidates[0].finish_reason == 'SAFETY':
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知"
reply_text = f"喵~分析圖片時好像遇到安全問題 ({block_reason})!"
elif response.text:
reply_text = response.text
else:
reply_text = "喵~我看不太懂這張圖片!"
else:
block_reason_msg = ""
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})"
reply_text = f"喵~分析圖片時好像遇到問題了!{block_reason_msg}"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
# 清理狀態
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]; print(f"Removed image_pending state.")
except Exception as remove_err: print(f"Warning: Could not remove image_pending state: {remove_err}")
except Exception as e:
print(f"圖像處理錯誤: {e}")
traceback.print_exc()
reply_text = "喵~處理圖片時出了點小狀況!"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
try: self.user_message_history[user_id] = [msg for msg in self.user_message_history[user_id] if msg != recent_image_info]
except: pass
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
return
print("Not an image analysis request.")
# --- 步驟 5: RAG 控制命令 ---
if isinstance(event.message, TextMessage):
message_text = prompt # 使用 strip 過的 prompt
# --- 修改: 直接使用在函數開頭定義的 source_id ---
# source_id = self._get_source_id_from_event(event) # 不再需要重新獲取
# ---
reply_text_rag_cmd = None # 避免與外層 reply_text 衝突
if hasattr(self.rag_manager, 'rag_enable_command') and message_text == self.rag_manager.rag_enable_command:
print(f"Enabling RAG for {source_id}") # 使用 source_id
self.rag_manager.rag_status[source_id] = True
reply_text_rag_cmd = "喵~我已開啟查詢模式!"
elif hasattr(self.rag_manager, 'rag_disable_command') and message_text == self.rag_manager.rag_disable_command:
print(f"Disabling RAG for {source_id}") # 使用 source_id
self.rag_manager.rag_status[source_id] = False
reply_text_rag_cmd = "喵~我已關閉查詢模式!"
if reply_text_rag_cmd: # 如果是 RAG 命令
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text_rag_cmd))
self.store_bot_reply(user_id, reply_text_rag_cmd) # 儲存機器人回覆
self.store_user_message(user_id, "text", prompt) # 儲存使用者指令
return # 處理完 RAG 命令後結束
# --- 步驟 6: RAG/API 查詢 ---
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True) # 使用 source_id
# ---
retrieved_info = None
if is_rag_enabled:
print(f"RAG is enabled for {source_id}. Retrieving info...") # 使用 source_id
retrieved_info = self.retrieve_relevant_info(prompt, user_id=user_id) # 傳遞 user_id 給 RAG
if retrieved_info: print("Retrieved info:", retrieved_info[:100] + "...")
else: print("No relevant info retrieved.")
else:
print(f"RAG is disabled for {source_id}. Skipping info retrieval.") # 使用 source_id
# --- 步驟 7: 準備給 Gemini 的提示 ---
prompt_for_gemini = prompt
if not prompt_for_gemini: prompt_for_gemini = "喵~"
print(f"Final prompt for Gemini: '{prompt_for_gemini}'")
# --- 步驟 8: 發送訊息給 Gemini 聊天模型 ---
chat_session = self.start_chat_session(user_id)
if not chat_session:
reply_text = "喵~糟糕,無法開始對話..."
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
return
final_prompt_to_send = prompt_for_gemini
if retrieved_info:
final_prompt_to_send = self.augment_prompt_with_info(prompt_for_gemini, retrieved_info)
reply_text = ""
try:
print("Sending final prompt to Gemini...")
response = chat_session.send_message(final_prompt_to_send)
# *** 包含對聊天模型空回應的檢查 ***
if response.candidates:
if response.candidates[0].finish_reason == 'SAFETY':
block_reason = response.prompt_feedback.block_reason if hasattr(response, 'prompt_feedback') else "未知"
reply_text = f"喵~這個話題有點敏感 ({block_reason})!"
elif response.text:
reply_text = response.text
print("Received text response from Gemini:", reply_text[:100] + "...")
else:
reply_text = "喵~橘橘不知道怎麼回答..."
else:
block_reason_msg = ""
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
block_reason_msg = f" (原因: {response.prompt_feedback.block_reason})"
reply_text = f"喵~請求好像被擋下來了!{block_reason_msg}"
except Exception as gemini_error:
print(f"Error calling Gemini API: {gemini_error}")
traceback.print_exc()
reply_text = "喵~糟糕,我的腦袋好像有點打結了..."
if not reply_text:
reply_text = "喵~橘橘現在有點累..."
# --- 步驟 9: 處理最終文字回覆 ---
print("Sending final text reply.")
try:
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
should_store_bot_reply = True
except Exception as final_reply_err:
print(f"Error sending final text reply: {final_reply_err}")
should_store_bot_reply = False
# --- 步驟 10: 儲存機器人回覆和使用者訊息 ---
if should_store_bot_reply: self.store_bot_reply(user_id, reply_text)
if should_store_user_message: self.store_user_message(user_id, "text", prompt)
def handle_image_message(self, event, line_bot_api, line_bot):
user_id = event.source.user_id
message_id = event.message.id
print(f"Handling image message from {user_id}, message ID: {message_id}")
try:
self.store_user_message(user_id, "image_pending", message_id)
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵!圖片已收到,請告訴我你想知道關於這張圖片的什麼問題呢?"))
except Exception as e:
print(f"Error storing image message ID: {e}")
traceback.print_exc()
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘記錄圖片訊息時出錯了:{e}"))
def handle_audio_message(self, event, line_bot_api, line_bot):
user_id = event.source.user_id
message_id = event.message.id
source_id = self._get_source_id_from_event(event)
print(f"Handling audio message from {source_id}, message ID: {message_id}")
m4a_audio_path = None
try:
m4a_audio_path = line_bot.get_audio_url(message_id)
if not m4a_audio_path: raise Exception("無法取得語音檔案路徑")
print(f"Retrieved m4a audio path: {m4a_audio_path}")
wav_content = None
try:
print("Converting M4A to WAV...")
audio = AudioSegment.from_file(m4a_audio_path, format="m4a")
audio = audio.set_frame_rate(16000).set_channels(1).set_sample_width(2)
wav_buffer = io.BytesIO()
audio.export(wav_buffer, format="wav")
wav_content = wav_buffer.getvalue()
print(f"Audio converted to WAV (LINEAR16) successfully, size: {len(wav_content)} bytes")
except Exception as conversion_error:
print(f"音訊轉換失敗: {conversion_error}")
traceback.print_exc()
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不懂這個語音格式耶!"))
return
finally:
if m4a_audio_path and os.path.exists(m4a_audio_path):
try: os.remove(m4a_audio_path); print(f"Removed temporary m4a file: {m4a_audio_path}")
except OSError as remove_error: print(f"Error removing temporary m4a file: {remove_error}")
text_from_audio = self._convert_audio_bytes_to_text(wav_content)
if not text_from_audio:
print("STT failed to transcribe audio.")
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="喵~聽不清楚你在說什麼,可以再說一次嗎?"))
return
print(f"STT transcription result: '{text_from_audio}'")
self.store_user_message(user_id, "text", text_from_audio)
chat_session = self.start_chat_session(user_id)
if not chat_session: raise Exception("Chat session failed to start.")
is_rag_enabled = self.rag_manager.rag_status.get(source_id, True)
retrieved_info = None
if is_rag_enabled:
retrieved_info = self.retrieve_relevant_info(text_from_audio, user_id=user_id)
final_prompt_to_send = text_from_audio
if retrieved_info:
final_prompt_to_send = self.augment_prompt_with_info(text_from_audio, retrieved_info)
reply_text = ""
try:
print("Sending transcribed text to Gemini...")
response = chat_session.send_message(final_prompt_to_send)
if response.candidates and response.text:
reply_text = response.text
else:
reply_text = "喵~我好像沒聽懂你的語音訊息!"
print("Received Gemini response for audio transcription:", reply_text[:100] + "...")
except Exception as gemini_error:
print(f"Error calling Gemini API for audio transcription: {gemini_error}")
traceback.print_exc()
reply_text = "喵~我聽懂你說什麼了,但我的腦袋好像有點打結了!"
if not reply_text: reply_text="喵~我沒聽懂你的語音訊息!"
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=reply_text))
self.store_bot_reply(user_id, reply_text)
except Exception as e:
print(f"處理語音訊息時發生未預期錯誤: {e}")
traceback.print_exc()
if m4a_audio_path and os.path.exists(m4a_audio_path):
try: os.remove(m4a_audio_path)
except: pass
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"橘橘無法處理語音訊息:{e}"))
def _convert_audio_bytes_to_text(self, audio_bytes):
try:
client = speech.SpeechClient()
audio = speech.RecognitionAudio(content=audio_bytes)
config = speech.RecognitionConfig(encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=16000, language_code="cmn-TW", enable_automatic_punctuation=True)
print("Sending audio bytes to Google STT API...")
response = client.recognize(config=config, audio=audio)
print("Received response from Google STT API.")
if response.results:
transcript = response.results[0].alternatives[0].transcript
print(f"STT API Transcription: '{transcript}'")
return transcript
else:
print("STT API returned no results.")
return None
except Exception as e:
print(f"Error calling Google STT API: {e}")
traceback.print_exc()
return None |