import os import io import time from PIL import Image from io import BytesIO from google import genai try: from google.genai import types print("Imported 'types' from 'google.genai'.") except ImportError: print("Warning: 'types' not found under 'google.genai'. Config might not work.") types = None from google.cloud import storage import traceback from typing import Union import base64 # --- 創建全域 genai_client --- genai_client = None try: genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) print("Successfully initialized global genai.Client (High-Level Facade).") except AttributeError: print("Warning: genai.Client not found. Trying API calls might fail.") except Exception as client_err: print(f"Error initializing global genai.Client: {client_err}") class ImageGenerator: def __init__(self): """ 初始化 ImageGenerator。 """ pass # 使用全域 client def generate_image_with_gemini(self, prompt) -> Union[bytes, None]: """ 使用 Gemini 模型生成圖片。 """ if not genai_client: print("Error: Global genai_client is not available.") return None try: print(f"Requesting image data for prompt: '{prompt}' using global genai_client.") model_name = "models/gemini-2.0-flash-exp-image-generation" if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'): print("Error: Global genai_client does not have 'models.generate_content' method.") return None gen_config = None if types and hasattr(types, 'GenerateContentConfig'): try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image']) except: gen_config = None else: print("types.GenerateContentConfig not available. Proceeding without config.") print(">>> Calling genai_client.models.generate_content...") start_time = time.time() if gen_config: response = genai_client.models.generate_content(model=model_name, contents=prompt, config=gen_config) else: response = genai_client.models.generate_content(model=model_name, contents=prompt) end_time = time.time() print(f"<<< Call to generate_content completed in {end_time - start_time:.2f} seconds.") # print("--- Full API Response (generate) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯 image_bytes = None if response.candidates: candidate = response.candidates[0] if candidate.content and candidate.content.parts: for part in candidate.content.parts: if hasattr(part, 'inline_data') and part.inline_data is not None: print("Found inline image data.") image_bytes = part.inline_data.data try: _ = Image.open(BytesIO(image_bytes)); print(f"Image data verified, size: {len(image_bytes)} bytes.") except Exception as img_verify_err: print(f"Warning: Received data might not be a valid image: {img_verify_err}"); return None break else: print("Response has no candidates."); # ... (檢查 prompt_feedback) ... if not image_bytes: print("No valid inline image data found in response.") return image_bytes except AttributeError as ae: print(f"圖像數據生成過程中發生屬性錯誤: {ae}"); traceback.print_exc(); return None except Exception as e: print(f"圖像數據生成過程中發生未預期錯誤: {e}"); traceback.print_exc(); return None # --- 圖片編輯方法 --- def edit_image_with_gemini(self, image_bytes: bytes, edit_prompt: str) -> Union[bytes, None]: """ 使用 Gemini 模型編輯圖片 (依照官方文件範例)。 Args: image_bytes (bytes): 要編輯的原始圖片的二進位數據。 edit_prompt (str): 描述如何編輯圖片的文字提示。 Returns: Union[bytes, None]: 編輯後的圖片二進位數據或 None。 """ if not genai_client: print("Error: Global genai_client is not available.") return None if not image_bytes: print("Error: No image bytes provided for editing."); return None if not edit_prompt: print("Error: No edit prompt provided."); return None try: print(f"Requesting image edit with prompt: '{edit_prompt}' using global genai_client.") model_name = "models/gemini-2.0-flash-exp-image-generation" # 同一個模型 # --- 準備 contents 列表 (文字 + PIL Image 物件) --- try: img = Image.open(BytesIO(image_bytes)) # 確保是 RGB 模式,如果需要的話 # if img.mode == 'RGBA': img = img.convert('RGB') except Exception as img_open_err: print(f"Error opening provided image bytes for editing: {img_open_err}") return None contents_for_edit = [edit_prompt, img] # 列表包含文字和 PIL Image 物件 # --- # 檢查 API 調用方法是否存在 if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'): print("Error: Global genai_client does not have 'models.generate_content' method.") return None # 準備 config (同圖片生成) gen_config = None if types and hasattr(types, 'GenerateContentConfig'): try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image']) except: gen_config = None print(">>> Calling genai_client.models.generate_content for image edit...") start_time = time.time() # 根據是否有 config 調用 API if gen_config: response = genai_client.models.generate_content( model=model_name, contents=contents_for_edit, # <--- 傳遞包含圖片和文字的列表 config=gen_config ) else: response = genai_client.models.generate_content( model=model_name, contents=contents_for_edit # <--- 傳遞包含圖片和文字的列表 ) end_time = time.time() print(f"<<< Call to generate_content (edit) completed in {end_time - start_time:.2f} seconds.") # print("--- Full API Response (edit) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯 # --- 處理回應 (尋找編輯後的圖片 inline_data) --- edited_image_bytes = None if response.candidates: candidate = response.candidates[0] if candidate.content and candidate.content.parts: print("Iterating through edit response parts...") for i, part in enumerate(candidate.content.parts): if hasattr(part, 'inline_data') and part.inline_data is not None: print(f" Part {i}: Found inline image data (edited).") edited_image_bytes = part.inline_data.data try: _ = Image.open(BytesIO(edited_image_bytes)) print(f" Edited image data received and verified, size: {len(edited_image_bytes)} bytes.") except Exception as img_verify_err: print(f" Warning: Received edited data might not be a valid image: {img_verify_err}") return None break else: print("Edit response has no candidates.") if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: print(f"Block reason: {response.prompt_feedback.block_reason}") if not edited_image_bytes: print("No valid inline image data found in edit response.") return edited_image_bytes # 返回編輯後的 bytes 或 None except AttributeError as ae: print(f"圖片編輯過程中發生屬性錯誤: {ae}") traceback.print_exc() return None except Exception as e: print(f"圖片編輯過程中發生未預期錯誤: {e}") traceback.print_exc() return None def upload_image_to_gcs(self, image_bytes: bytes, file_name_prefix: str = "gemini-image") -> Union[str, None]: """ 將圖片二進位數據 (bytes) 轉換為 JPG 並上傳到 Google Cloud Storage。 """ if not image_bytes: return None try: print("Converting received image bytes to JPG format...") try: image = Image.open(BytesIO(image_bytes)) if image.mode == 'RGBA': image = image.convert('RGB') jpg_buffer = io.BytesIO() image.save(jpg_buffer, format="JPEG", quality=85) jpg_bytes = jpg_buffer.getvalue() print(f"Image converted to JPG successfully, size: {len(jpg_bytes)} bytes.") except Exception as convert_err: print(f"Error converting image to JPG: {convert_err}") traceback.print_exc() return None bucket_name = "stt_bucket_for_allen" timestamp = int(time.time() * 1000) file_extension = "jpg" content_type = "image/jpeg" gcs_file_path = f"EJ/{file_name_prefix}_{timestamp}.{file_extension}" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(gcs_file_path) print(f"Uploading JPG image bytes to gs://{bucket_name}/{gcs_file_path}") blob.upload_from_string(data=jpg_bytes, content_type=content_type) print("Image uploaded to GCS.") public_url = blob.public_url print(f"Generated GCS public URL: {public_url}") if not public_url or not public_url.startswith("https://"): print(f"Error or Warning: Invalid public URL generated: {public_url}") if public_url and public_url.startswith("http://"): public_url = "https://" + public_url[len("http://"):] print(f"Corrected URL to HTTPS: {public_url}") else: return None print(f"Image uploaded successfully to GCS: {public_url}") return public_url except Exception as e: print(f"上傳圖片到 GCP 時出錯: {e}") traceback.print_exc() return None