Spaces:
Running
Running
import os | |
import io | |
import time | |
from PIL import Image | |
from io import BytesIO | |
from google import genai | |
try: | |
from google.genai import types | |
print("Imported 'types' from 'google.genai'.") | |
except ImportError: | |
print("Warning: 'types' not found under 'google.genai'. Config might not work.") | |
types = None | |
from google.cloud import storage | |
import traceback | |
from typing import Union | |
import base64 | |
# --- 創建全域 genai_client --- | |
genai_client = None | |
try: | |
genai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) | |
print("Successfully initialized global genai.Client (High-Level Facade).") | |
except AttributeError: | |
print("Warning: genai.Client not found. Trying API calls might fail.") | |
except Exception as client_err: | |
print(f"Error initializing global genai.Client: {client_err}") | |
class ImageGenerator: | |
def __init__(self): | |
""" | |
初始化 ImageGenerator。 | |
""" | |
pass # 使用全域 client | |
def generate_image_with_gemini(self, prompt) -> Union[bytes, None]: | |
""" | |
使用 Gemini 模型生成圖片。 | |
""" | |
if not genai_client: | |
print("Error: Global genai_client is not available.") | |
return None | |
try: | |
print(f"Requesting image data for prompt: '{prompt}' using global genai_client.") | |
model_name = "models/gemini-2.0-flash-exp-image-generation" | |
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'): | |
print("Error: Global genai_client does not have 'models.generate_content' method.") | |
return None | |
gen_config = None | |
if types and hasattr(types, 'GenerateContentConfig'): | |
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image']) | |
except: gen_config = None | |
else: print("types.GenerateContentConfig not available. Proceeding without config.") | |
print(">>> Calling genai_client.models.generate_content...") | |
start_time = time.time() | |
if gen_config: | |
response = genai_client.models.generate_content(model=model_name, contents=prompt, config=gen_config) | |
else: | |
response = genai_client.models.generate_content(model=model_name, contents=prompt) | |
end_time = time.time() | |
print(f"<<< Call to generate_content completed in {end_time - start_time:.2f} seconds.") | |
# print("--- Full API Response (generate) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯 | |
image_bytes = None | |
if response.candidates: | |
candidate = response.candidates[0] | |
if candidate.content and candidate.content.parts: | |
for part in candidate.content.parts: | |
if hasattr(part, 'inline_data') and part.inline_data is not None: | |
print("Found inline image data.") | |
image_bytes = part.inline_data.data | |
try: _ = Image.open(BytesIO(image_bytes)); print(f"Image data verified, size: {len(image_bytes)} bytes.") | |
except Exception as img_verify_err: print(f"Warning: Received data might not be a valid image: {img_verify_err}"); return None | |
break | |
else: | |
print("Response has no candidates."); # ... (檢查 prompt_feedback) ... | |
if not image_bytes: print("No valid inline image data found in response.") | |
return image_bytes | |
except AttributeError as ae: print(f"圖像數據生成過程中發生屬性錯誤: {ae}"); traceback.print_exc(); return None | |
except Exception as e: print(f"圖像數據生成過程中發生未預期錯誤: {e}"); traceback.print_exc(); return None | |
# --- 圖片編輯方法 --- | |
def edit_image_with_gemini(self, image_bytes: bytes, edit_prompt: str) -> Union[bytes, None]: | |
""" | |
使用 Gemini 模型編輯圖片 (依照官方文件範例)。 | |
Args: | |
image_bytes (bytes): 要編輯的原始圖片的二進位數據。 | |
edit_prompt (str): 描述如何編輯圖片的文字提示。 | |
Returns: | |
Union[bytes, None]: 編輯後的圖片二進位數據或 None。 | |
""" | |
if not genai_client: | |
print("Error: Global genai_client is not available.") | |
return None | |
if not image_bytes: print("Error: No image bytes provided for editing."); return None | |
if not edit_prompt: print("Error: No edit prompt provided."); return None | |
try: | |
print(f"Requesting image edit with prompt: '{edit_prompt}' using global genai_client.") | |
model_name = "models/gemini-2.0-flash-exp-image-generation" # 同一個模型 | |
# --- 準備 contents 列表 (文字 + PIL Image 物件) --- | |
try: | |
img = Image.open(BytesIO(image_bytes)) | |
# 確保是 RGB 模式,如果需要的話 | |
# if img.mode == 'RGBA': img = img.convert('RGB') | |
except Exception as img_open_err: | |
print(f"Error opening provided image bytes for editing: {img_open_err}") | |
return None | |
contents_for_edit = [edit_prompt, img] # 列表包含文字和 PIL Image 物件 | |
# --- | |
# 檢查 API 調用方法是否存在 | |
if not hasattr(genai_client, 'models') or not hasattr(genai_client.models, 'generate_content'): | |
print("Error: Global genai_client does not have 'models.generate_content' method.") | |
return None | |
# 準備 config (同圖片生成) | |
gen_config = None | |
if types and hasattr(types, 'GenerateContentConfig'): | |
try: gen_config = types.GenerateContentConfig(response_modalities=['Text', 'Image']) | |
except: gen_config = None | |
print(">>> Calling genai_client.models.generate_content for image edit...") | |
start_time = time.time() | |
# 根據是否有 config 調用 API | |
if gen_config: | |
response = genai_client.models.generate_content( | |
model=model_name, | |
contents=contents_for_edit, # <--- 傳遞包含圖片和文字的列表 | |
config=gen_config | |
) | |
else: | |
response = genai_client.models.generate_content( | |
model=model_name, | |
contents=contents_for_edit # <--- 傳遞包含圖片和文字的列表 | |
) | |
end_time = time.time() | |
print(f"<<< Call to generate_content (edit) completed in {end_time - start_time:.2f} seconds.") | |
# print("--- Full API Response (edit) ---"); try: print(response); except: pass; print("---") # 可取消註解除錯 | |
# --- 處理回應 (尋找編輯後的圖片 inline_data) --- | |
edited_image_bytes = None | |
if response.candidates: | |
candidate = response.candidates[0] | |
if candidate.content and candidate.content.parts: | |
print("Iterating through edit response parts...") | |
for i, part in enumerate(candidate.content.parts): | |
if hasattr(part, 'inline_data') and part.inline_data is not None: | |
print(f" Part {i}: Found inline image data (edited).") | |
edited_image_bytes = part.inline_data.data | |
try: | |
_ = Image.open(BytesIO(edited_image_bytes)) | |
print(f" Edited image data received and verified, size: {len(edited_image_bytes)} bytes.") | |
except Exception as img_verify_err: | |
print(f" Warning: Received edited data might not be a valid image: {img_verify_err}") | |
return None | |
break | |
else: | |
print("Edit response has no candidates.") | |
if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: | |
print(f"Block reason: {response.prompt_feedback.block_reason}") | |
if not edited_image_bytes: | |
print("No valid inline image data found in edit response.") | |
return edited_image_bytes # 返回編輯後的 bytes 或 None | |
except AttributeError as ae: | |
print(f"圖片編輯過程中發生屬性錯誤: {ae}") | |
traceback.print_exc() | |
return None | |
except Exception as e: | |
print(f"圖片編輯過程中發生未預期錯誤: {e}") | |
traceback.print_exc() | |
return None | |
def upload_image_to_gcs(self, image_bytes: bytes, file_name_prefix: str = "gemini-image") -> Union[str, None]: | |
""" | |
將圖片二進位數據 (bytes) 轉換為 JPG 並上傳到 Google Cloud Storage。 | |
""" | |
if not image_bytes: return None | |
try: | |
print("Converting received image bytes to JPG format...") | |
try: | |
image = Image.open(BytesIO(image_bytes)) | |
if image.mode == 'RGBA': image = image.convert('RGB') | |
jpg_buffer = io.BytesIO() | |
image.save(jpg_buffer, format="JPEG", quality=85) | |
jpg_bytes = jpg_buffer.getvalue() | |
print(f"Image converted to JPG successfully, size: {len(jpg_bytes)} bytes.") | |
except Exception as convert_err: | |
print(f"Error converting image to JPG: {convert_err}") | |
traceback.print_exc() | |
return None | |
bucket_name = "stt_bucket_for_allen" | |
timestamp = int(time.time() * 1000) | |
file_extension = "jpg" | |
content_type = "image/jpeg" | |
gcs_file_path = f"EJ/{file_name_prefix}_{timestamp}.{file_extension}" | |
storage_client = storage.Client() | |
bucket = storage_client.bucket(bucket_name) | |
blob = bucket.blob(gcs_file_path) | |
print(f"Uploading JPG image bytes to gs://{bucket_name}/{gcs_file_path}") | |
blob.upload_from_string(data=jpg_bytes, content_type=content_type) | |
print("Image uploaded to GCS.") | |
public_url = blob.public_url | |
print(f"Generated GCS public URL: {public_url}") | |
if not public_url or not public_url.startswith("https://"): | |
print(f"Error or Warning: Invalid public URL generated: {public_url}") | |
if public_url and public_url.startswith("http://"): | |
public_url = "https://" + public_url[len("http://"):] | |
print(f"Corrected URL to HTTPS: {public_url}") | |
else: return None | |
print(f"Image uploaded successfully to GCS: {public_url}") | |
return public_url | |
except Exception as e: | |
print(f"上傳圖片到 GCP 時出錯: {e}") | |
traceback.print_exc() | |
return None |