gemiline / Image_generation.py
motaer0206's picture
Update Image_generation.py
52e6f04 verified
import os
import io
import time
from PIL import Image
from io import BytesIO
from google import genai
try:
from google.genai import types
print("Imported 'types' from 'google.genai'.")
except ImportError:
print("Warning: 'types' not found under 'google.genai'. Config might not work.")
types = None
from google.cloud import storage
import traceback
from typing import Union
import base64
# --- 創建全域 genai_client ---
genai_client = None
try:
genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
print("Successfully initialized global genai.Client (High-Level Facade).")
except AttributeError:
print("Warning: genai.Client not found. Trying API calls might fail.")
except Exception as client_err:
print(f"Error initializing global genai.Client: {client_err}")
class ImageGenerator:
def __init__(self):
"""
初始化 ImageGenerator。
"""
pass # 使用全域 client
def generate_image_with_gemini(self, prompt) -> Union[bytes, None]:
"""
使用 Gemini 模型生成圖片。
"""
if not genai_client:
print("Error: Global genai_client is not available.")
return None
try:
print(f"Requesting image data for prompt: '{prompt}' using global genai_client.")
model_name = "models/gemini-2.0-flash-exp-image-generation"
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
print("Error: Global genai_client does not have 'models.generate_content' method.")
return None
gen_config = None
if types and hasattr(types, 'GenerateContentConfig'):
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
except: gen_config = None
else: print("types.GenerateContentConfig not available. Proceeding without config.")
print(">>> Calling genai_client.models.generate_content...")
start_time = time.time()
if gen_config:
response = genai_client.models.generate_content(model=model_name, contents=prompt, config=gen_config)
else:
response = genai_client.models.generate_content(model=model_name, contents=prompt)
end_time = time.time()
print(f"<<< Call to generate_content completed in {end_time - start_time:.2f} seconds.")
# print("--- Full API Response (generate) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯
image_bytes = None
if response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'inline_data') and part.inline_data is not None:
print("Found inline image data.")
image_bytes = part.inline_data.data
try: _ = Image.open(BytesIO(image_bytes)); print(f"Image data verified, size: {len(image_bytes)} bytes.")
except Exception as img_verify_err: print(f"Warning: Received data might not be a valid image: {img_verify_err}"); return None
break
else:
print("Response has no candidates."); # ... (檢查 prompt_feedback) ...
if not image_bytes: print("No valid inline image data found in response.")
return image_bytes
except AttributeError as ae: print(f"圖像數據生成過程中發生屬性錯誤: {ae}"); traceback.print_exc(); return None
except Exception as e: print(f"圖像數據生成過程中發生未預期錯誤: {e}"); traceback.print_exc(); return None
# --- 圖片編輯方法 ---
def edit_image_with_gemini(self, image_bytes: bytes, edit_prompt: str) -> Union[bytes, None]:
"""
使用 Gemini 模型編輯圖片 (依照官方文件範例)。
Args:
image_bytes (bytes): 要編輯的原始圖片的二進位數據。
edit_prompt (str): 描述如何編輯圖片的文字提示。
Returns:
Union[bytes, None]: 編輯後的圖片二進位數據或 None。
"""
if not genai_client:
print("Error: Global genai_client is not available.")
return None
if not image_bytes: print("Error: No image bytes provided for editing."); return None
if not edit_prompt: print("Error: No edit prompt provided."); return None
try:
print(f"Requesting image edit with prompt: '{edit_prompt}' using global genai_client.")
model_name = "models/gemini-2.0-flash-exp-image-generation" # 同一個模型
# --- 準備 contents 列表 (文字 + PIL Image 物件) ---
try:
img = Image.open(BytesIO(image_bytes))
# 確保是 RGB 模式,如果需要的話
# if img.mode == 'RGBA': img = img.convert('RGB')
except Exception as img_open_err:
print(f"Error opening provided image bytes for editing: {img_open_err}")
return None
contents_for_edit = [edit_prompt, img] # 列表包含文字和 PIL Image 物件
# ---
# 檢查 API 調用方法是否存在
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'):
print("Error: Global genai_client does not have 'models.generate_content' method.")
return None
# 準備 config (同圖片生成)
gen_config = None
if types and hasattr(types, 'GenerateContentConfig'):
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image'])
except: gen_config = None
print(">>> Calling genai_client.models.generate_content for image edit...")
start_time = time.time()
# 根據是否有 config 調用 API
if gen_config:
response = genai_client.models.generate_content(
model=model_name,
contents=contents_for_edit, # <--- 傳遞包含圖片和文字的列表
config=gen_config
)
else:
response = genai_client.models.generate_content(
model=model_name,
contents=contents_for_edit # <--- 傳遞包含圖片和文字的列表
)
end_time = time.time()
print(f"<<< Call to generate_content (edit) completed in {end_time - start_time:.2f} seconds.")
# print("--- Full API Response (edit) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯
# --- 處理回應 (尋找編輯後的圖片 inline_data) ---
edited_image_bytes = None
if response.candidates:
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
print("Iterating through edit response parts...")
for i, part in enumerate(candidate.content.parts):
if hasattr(part, 'inline_data') and part.inline_data is not None:
print(f" Part {i}: Found inline image data (edited).")
edited_image_bytes = part.inline_data.data
try:
_ = Image.open(BytesIO(edited_image_bytes))
print(f" Edited image data received and verified, size: {len(edited_image_bytes)} bytes.")
except Exception as img_verify_err:
print(f" Warning: Received edited data might not be a valid image: {img_verify_err}")
return None
break
else:
print("Edit response has no candidates.")
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason:
print(f"Block reason: {response.prompt_feedback.block_reason}")
if not edited_image_bytes:
print("No valid inline image data found in edit response.")
return edited_image_bytes # 返回編輯後的 bytes 或 None
except AttributeError as ae:
print(f"圖片編輯過程中發生屬性錯誤: {ae}")
traceback.print_exc()
return None
except Exception as e:
print(f"圖片編輯過程中發生未預期錯誤: {e}")
traceback.print_exc()
return None
def upload_image_to_gcs(self, image_bytes: bytes, file_name_prefix: str = "gemini-image") -> Union[str, None]:
"""
將圖片二進位數據 (bytes) 轉換為 JPG 並上傳到 Google Cloud Storage。
"""
if not image_bytes: return None
try:
print("Converting received image bytes to JPG format...")
try:
image = Image.open(BytesIO(image_bytes))
if image.mode == 'RGBA': image = image.convert('RGB')
jpg_buffer = io.BytesIO()
image.save(jpg_buffer, format="JPEG", quality=85)
jpg_bytes = jpg_buffer.getvalue()
print(f"Image converted to JPG successfully, size: {len(jpg_bytes)} bytes.")
except Exception as convert_err:
print(f"Error converting image to JPG: {convert_err}")
traceback.print_exc()
return None
bucket_name = "stt_bucket_for_allen"
timestamp = int(time.time() * 1000)
file_extension = "jpg"
content_type = "image/jpeg"
gcs_file_path = f"EJ/{file_name_prefix}_{timestamp}.{file_extension}"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(gcs_file_path)
print(f"Uploading JPG image bytes to gs://{bucket_name}/{gcs_file_path}")
blob.upload_from_string(data=jpg_bytes, content_type=content_type)
print("Image uploaded to GCS.")
public_url = blob.public_url
print(f"Generated GCS public URL: {public_url}")
if not public_url or not public_url.startswith("https://"):
print(f"Error or Warning: Invalid public URL generated: {public_url}")
if public_url and public_url.startswith("http://"):
public_url = "https://" + public_url[len("http://"):]
print(f"Corrected URL to HTTPS: {public_url}")
else: return None
print(f"Image uploaded successfully to GCS: {public_url}")
return public_url
except Exception as e:
print(f"上傳圖片到 GCP 時出錯: {e}")
traceback.print_exc()
return None