Spaces:
Runtime error
Runtime error
# app.py | |
import os | |
import random | |
import textwrap | |
import logging | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List, Optional, Tuple | |
# Attempt to load environment variables from .env file for local testing | |
# In Hugging Face, secrets are injected directly as environment variables | |
from dotenv import load_dotenv | |
load_dotenv() # Load .env file if it exists | |
from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError | |
import numpy as np | |
from telegram import Update, InputFile | |
from telegram.ext import Application, MessageHandler, filters, ContextTypes | |
from telegram.constants import ParseMode | |
from telegram.error import TelegramError | |
# --- Configuration --- | |
class Config: | |
# Read Telegram token from environment variable (set in HF Secrets) | |
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN') | |
if not TELEGRAM_TOKEN: | |
# Log critical error if token is missing | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logging.critical("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") | |
raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") | |
# Directories (relative paths for container compatibility) | |
PREDEFINED_TEMPLATES_DIR = "templates" | |
OUTPUT_DIR = "generated_images" | |
# UPDATED: Use the font name "Arial". Pillow should find the system-installed font. | |
FONT_PATH = "Arial" | |
# Predefined Template Settings | |
TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas | |
PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template | |
PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template | |
# Auto Template Settings | |
AUTO_TEMPLATES_COUNT = 5 | |
AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images | |
AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template | |
MIN_FONT_SIZE = 30 | |
MAX_FONT_SIZE = 60 | |
TEXT_STROKE_WIDTH = 2 | |
NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1) | |
# Other | |
MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping | |
JPEG_QUALITY = 85 # Quality for saving JPG images | |
# --- Logging Setup --- | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
) | |
# Reduce verbosity of underlying HTTP library logger used by python-telegram-bot | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
logger = logging.getLogger(__name__) | |
# --- Setup --- | |
# Ensure necessary directories exist within the container's filesystem | |
# This runs when the script starts inside the container | |
try: | |
# These paths are relative to the WORKDIR /app set in the Dockerfile | |
os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True) | |
os.makedirs(Config.OUTPUT_DIR, exist_ok=True) | |
logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}") | |
except OSError as e: | |
logger.error(f"FATAL: Error creating essential directories: {e}", exc_info=True) | |
# If directories cannot be created, the app likely cannot function | |
raise SystemExit(f"FATAL: Cannot create directories {Config.PREDEFINED_TEMPLATES_DIR} or {Config.OUTPUT_DIR}") from e | |
# Font check is now less critical as we rely on system fonts, but Pillow will log if "Arial" can't be found. | |
logger.info(f"Application will attempt to use font '{Config.FONT_PATH}' via system font discovery (fontconfig).") | |
# --- Helper Functions --- | |
def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image: | |
"""Adds subtle noise to a PIL image using numpy.""" | |
try: | |
# Ensure image is in RGB mode for numpy array processing | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
img_array = np.array(img, dtype=np.float32) / 255.0 | |
# Generate Gaussian noise matching image dimensions | |
noise = np.random.randn(*img_array.shape) * intensity | |
# Add noise and clip values to valid range [0, 1] | |
noisy_img_array = np.clip(img_array + noise, 0.0, 1.0) | |
# Convert back to PIL Image | |
noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB') | |
return noisy_img | |
except Exception as e: | |
logger.error(f"Error adding noise: {e}", exc_info=True) | |
return img # Return original image on error | |
# --- Image Processing Functions --- | |
def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]: | |
""" | |
Applies user image and caption to a predefined template using pasting. | |
Args: | |
user_image_path: Path to the downloaded user image. | |
caption: Text caption provided by the user. | |
template_path: Path to the predefined template image. | |
Returns: | |
Path to the generated image (in OUTPUT_DIR), or None if an error occurred. | |
""" | |
# Generate a unique-ish output filename to avoid conflicts | |
base_name = os.path.basename(template_path).split('.')[0] | |
# Include a random element to prevent overwriting if called rapidly | |
output_filename = f"result_{base_name}_{random.randint(10000, 99999)}.jpg" | |
output_path = os.path.join(Config.OUTPUT_DIR, output_filename) | |
logger.debug(f"Applying template '{os.path.basename(template_path)}' to user image '{os.path.basename(user_image_path)}'. Output: {output_path}") | |
try: | |
# Use 'with' statement for automatic resource cleanup (file closing) | |
with Image.open(template_path).convert("RGBA") as template, \ | |
Image.open(user_image_path).convert("RGBA") as user_image_orig: | |
# Optional: Check if template size matches config (for consistency) | |
if template.size != Config.TEMPLATE_SIZE: | |
logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.") | |
# Resize user image to fit the placeholder area using Lanczos resampling for quality | |
logger.debug(f"Resizing user image to placeholder size {Config.PLACEHOLDER_SIZE}") | |
user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS) | |
# Create a working copy of the template to paste onto | |
combined = template.copy() | |
# Paste the resized user image into the placeholder position | |
# The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency | |
logger.debug(f"Pasting resized user image at {Config.PLACEHOLDER_POSITION}") | |
combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None) | |
# --- Add Caption --- | |
logger.debug("Adding caption to template image") | |
draw = ImageDraw.Draw(combined) | |
try: | |
# Use a medium font size relative to config | |
font_size = Config.MAX_FONT_SIZE // 2 | |
logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") | |
font = ImageFont.truetype(Config.FONT_PATH, font_size) | |
except IOError: # This can happen if font "Arial" is not found by Pillow | |
logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") | |
font = ImageFont.load_default() | |
# Wrap text according to configured width | |
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) | |
logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") | |
# Calculate text position (e.g., centered below the placeholder area) | |
# Use textbbox for more accurate positioning | |
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") | |
text_width = text_bbox[2] - text_bbox[0] | |
text_height = text_bbox[3] - text_bbox[1] | |
# Center horizontally | |
text_x = (combined.width - text_width) // 2 | |
# Position below placeholder, add some padding | |
text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20 # Adjust padding as needed | |
logger.debug(f"Calculated text position: ({text_x}, {text_y})") | |
# Draw text with a simple shadow/stroke for better visibility | |
# Draw shadow slightly offset | |
shadow_offset = 1 | |
draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center") | |
# Draw main text | |
draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center") | |
# Convert to RGB before saving as JPG (removes alpha channel) | |
logger.debug("Converting final image to RGB") | |
combined_rgb = combined.convert("RGB") | |
logger.debug(f"Saving final image to {output_path} with quality {Config.JPEG_QUALITY}") | |
combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) | |
logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}") | |
return output_path | |
except FileNotFoundError: | |
logger.error(f"Template or user image not found. Template: '{template_path}', User Image: '{user_image_path}'") | |
except UnidentifiedImageError: | |
logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: '{template_path}', User Image: '{user_image_path}'") | |
except Exception as e: | |
# Log detailed error including traceback | |
logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True) | |
# Explicitly return None on any error during the process | |
return None | |
def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]: | |
""" | |
Generates a dynamic template with various effects. | |
Args: | |
user_image_path: Path to the downloaded user image. | |
caption: Text caption provided by the user. | |
variant: An integer index to introduce variation. | |
Returns: | |
Path to the generated image (in OUTPUT_DIR), or None if an error occurred. | |
""" | |
output_filename = f"auto_template_{variant}_{random.randint(10000, 99999)}.jpg" | |
output_path = os.path.join(Config.OUTPUT_DIR, output_filename) | |
logger.debug(f"Creating auto template variant {variant}. Output: {output_path}") | |
try: | |
# --- Create Background --- | |
# Generate a random somewhat dark color for the background | |
bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) | |
logger.debug(f"Auto template {variant}: Background color {bg_color}") | |
bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color) | |
# --- Process User Image --- | |
with Image.open(user_image_path) as user_img_orig: | |
# Work on a copy | |
user_img = user_img_orig.copy() | |
# Resize to fit the designated area within the auto-template | |
logger.debug(f"Auto template {variant}: Resizing user image to {Config.AUTO_USER_IMAGE_SIZE}") | |
user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS) | |
# Apply random effects based on a random choice (not just variant index) | |
effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'sharpness', 'none']) | |
logger.debug(f"Auto template {variant}: Applying effect '{effect_choice}'") | |
if effect_choice == 'blur': | |
blur_radius = random.uniform(0.5, 1.8) | |
logger.debug(f"Applying GaussianBlur with radius {blur_radius:.2f}") | |
user_img = user_img.filter(ImageFilter.GaussianBlur(blur_radius)) | |
elif effect_choice == 'noise': | |
logger.debug(f"Applying noise with intensity {Config.NOISE_INTENSITY:.2f}") | |
user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY) | |
elif effect_choice == 'color': # Enhance or reduce color saturation | |
color_factor = random.uniform(0.3, 1.7) | |
logger.debug(f"Enhancing color with factor {color_factor:.2f}") | |
enhancer = ImageEnhance.Color(user_img) | |
user_img = enhancer.enhance(color_factor) | |
elif effect_choice == 'contrast': # Enhance or reduce contrast | |
contrast_factor = random.uniform(0.7, 1.4) | |
logger.debug(f"Enhancing contrast with factor {contrast_factor:.2f}") | |
enhancer = ImageEnhance.Contrast(user_img) | |
user_img = enhancer.enhance(contrast_factor) | |
elif effect_choice == 'sharpness': # Enhance sharpness | |
sharpness_factor = random.uniform(1.1, 2.0) | |
logger.debug(f"Enhancing sharpness with factor {sharpness_factor:.2f}") | |
enhancer = ImageEnhance.Sharpness(user_img) | |
user_img = enhancer.enhance(sharpness_factor) | |
# 'none' applies no extra filter | |
# Add a decorative border with a random light color | |
border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255)) | |
border_width = random.randint(8, 20) | |
logger.debug(f"Adding border width {border_width} color {border_color}") | |
user_img = ImageOps.expand(user_img, border=border_width, fill=border_color) | |
# --- Paste User Image onto Background --- | |
# Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically | |
paste_x = (bg.width - user_img.width) // 2 | |
paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center | |
logger.debug(f"Pasting processed user image at ({paste_x}, {paste_y})") | |
bg.paste(user_img, (paste_x, paste_y)) | |
# --- Add Styled Text --- | |
logger.debug("Adding caption to auto template") | |
draw = ImageDraw.Draw(bg) | |
try: | |
# Random font size within configured range | |
font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE) | |
logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") | |
font = ImageFont.truetype(Config.FONT_PATH, font_size) | |
except IOError: # This can happen if font "Arial" is not found by Pillow | |
logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") | |
font = ImageFont.load_default() # Fallback font | |
# Wrap text | |
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) | |
logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") | |
# Calculate text position (centered horizontally, below the pasted image) | |
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") | |
text_width = text_bbox[2] - text_bbox[0] | |
text_height = text_bbox[3] - text_bbox[1] | |
text_x = (bg.width - text_width) // 2 | |
# Position below the image + border, add padding | |
text_y = paste_y + user_img.height + 30 | |
logger.debug(f"Calculated text position: ({text_x}, {text_y})") | |
# Random bright text color and dark stroke color | |
text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) | |
stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50)) | |
logger.debug(f"Text color {text_color}, stroke color {stroke_color}") | |
# Draw text with stroke | |
draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color, | |
stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center") | |
# Save the final image | |
logger.debug(f"Saving final auto template image to {output_path} with quality {Config.JPEG_QUALITY}") | |
bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) | |
logger.info(f"Generated auto-template image (variant {variant}): {output_path}") | |
return output_path | |
except FileNotFoundError: | |
logger.error(f"User image not found during auto-template creation: '{user_image_path}'") | |
except UnidentifiedImageError: | |
logger.error(f"Could not identify user image file during auto-template creation: '{user_image_path}'") | |
except Exception as e: | |
logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True) | |
return None | |
def load_predefined_templates() -> List[str]: | |
"""Loads paths of all valid template images from the predefined directory.""" | |
templates = [] | |
template_dir = Config.PREDEFINED_TEMPLATES_DIR | |
logger.debug(f"Searching for templates in directory: {os.path.abspath(template_dir)}") | |
try: | |
if not os.path.isdir(template_dir): | |
logger.warning(f"Predefined templates directory not found: '{template_dir}'") | |
return [] # Return empty list if directory doesn't exist | |
files = os.listdir(template_dir) | |
logger.debug(f"Found {len(files)} files/dirs in template directory.") | |
for file in files: | |
# Check for common image extensions | |
if file.lower().endswith(('.png', '.jpg', '.jpeg')): | |
full_path = os.path.join(template_dir, file) | |
if os.path.isfile(full_path): # Ensure it's actually a file | |
templates.append(full_path) | |
logger.debug(f"Found template: {full_path}") | |
else: | |
logger.warning(f"Found item with image extension but is not a file: {full_path}") | |
if not templates: | |
logger.warning(f"No valid template image files found in '{template_dir}'.") | |
else: | |
logger.info(f"Loaded {len(templates)} predefined templates.") | |
except Exception as e: | |
logger.error(f"Error loading predefined templates from '{template_dir}': {e}", exc_info=True) | |
return templates | |
# This function orchestrates the image processing. It contains blocking Pillow calls, | |
# so it's designed to be run in a thread pool executor by the async handler. | |
def process_images(user_image_path: str, caption: str) -> List[str]: | |
""" | |
Processes the user image against predefined and auto-generated templates. | |
This is a SYNCHRONOUS function. | |
Args: | |
user_image_path: Path to the temporary user image file. | |
caption: The user's caption. | |
Returns: | |
A list of paths to the generated images. | |
""" | |
logger.info("Starting image processing task...") | |
generated_image_paths: List[str] = [] | |
predefined_templates = load_predefined_templates() | |
# 1. Process predefined templates | |
if predefined_templates: | |
logger.info(f"Processing {len(predefined_templates)} predefined templates...") | |
for template_path in predefined_templates: | |
result_path = apply_template(user_image_path, caption, template_path) | |
if result_path: | |
generated_image_paths.append(result_path) | |
else: | |
logger.warning(f"Failed to generate image for template: {os.path.basename(template_path)}") | |
else: | |
logger.info("Skipping predefined templates (none found or loaded).") | |
# 2. Generate auto templates | |
logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...") | |
for i in range(Config.AUTO_TEMPLATES_COUNT): | |
result_path = create_auto_template(user_image_path, caption, i) | |
if result_path: | |
generated_image_paths.append(result_path) | |
else: | |
logger.warning(f"Failed to generate auto-template variant {i}") | |
logger.info(f"Image processing task finished. Generated {len(generated_image_paths)} images in total.") | |
return generated_image_paths | |
# --- Telegram Bot Handler --- | |
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handles incoming messages with photos and captions.""" | |
# Basic check for essential message components | |
if not update.message or not update.message.photo or not update.message.caption: | |
logger.warning("Handler invoked for message missing photo or caption. This shouldn't happen with the current filters.") | |
return | |
user = update.message.from_user | |
# Use 'UnknownUser' or similar if user info isn't available | |
user_id = user.id if user else "UnknownUser" | |
user_info = f"user_id={user_id}" + (f", username={user.username}" if user and user.username else "") | |
caption = update.message.caption | |
message_id = update.message.message_id | |
chat_id = update.message.chat_id | |
logger.info(f"Received photo with caption from {user_info} in chat {chat_id} (message_id={message_id}).") | |
# --- Download User Image --- | |
# Create a unique temporary path for the downloaded image within the OUTPUT_DIR | |
temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg") | |
file_downloaded = False | |
download_start_time = asyncio.get_running_loop().time() | |
try: | |
photo = update.message.photo[-1] # Get the highest resolution photo available | |
logger.info(f"Attempting download photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...") | |
photo_file = await photo.get_file() | |
await photo_file.download_to_drive(temp_user_image_path) | |
download_time = asyncio.get_running_loop().time() - download_start_time | |
logger.info(f"Photo downloaded successfully to '{temp_user_image_path}' in {download_time:.2f} seconds.") | |
file_downloaded = True | |
except TelegramError as e: | |
logger.error(f"Telegram error downloading photo for message {message_id}: {e}", exc_info=True) | |
await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try sending it again.") | |
return # Stop processing if download fails | |
except Exception as e: | |
logger.error(f"Unexpected error downloading photo for message {message_id}: {e}", exc_info=True) | |
await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.") | |
return # Stop processing if download fails | |
# Safety check, though exceptions should prevent reaching here if download failed | |
if not file_downloaded or not os.path.exists(temp_user_image_path): | |
logger.error(f"Download reported success but file '{temp_user_image_path}' does not exist.") | |
await update.message.reply_text("❌ An internal error occurred after downloading the image.") | |
return | |
# --- Process Images in Executor --- | |
# Notify user that processing has started | |
processing_message = None | |
try: | |
# Quote the original message for context | |
processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True) | |
except TelegramError as e: | |
logger.warning(f"Could not send 'Processing...' message to chat {chat_id}: {e}") | |
# Continue processing even if status message fails | |
message_to_delete = processing_message.message_id if processing_message else None | |
loop = asyncio.get_running_loop() | |
generated_images = [] | |
processing_failed = False | |
processing_start_time = loop.time() | |
try: | |
logger.info(f"Submitting image processing task to executor for user image '{os.path.basename(temp_user_image_path)}'") | |
# Run the blocking image processing function in the default thread pool executor | |
generated_images = await loop.run_in_executor( | |
None, # Use default ThreadPoolExecutor | |
process_images, # The synchronous function to run | |
temp_user_image_path, # Argument 1 for process_images | |
caption # Argument 2 for process_images | |
) | |
processing_time = loop.time() - processing_start_time | |
logger.info(f"Image processing task completed in {processing_time:.2f} seconds.") | |
except Exception as e: | |
processing_failed = True | |
logger.error(f"Error during image processing executor call for message {message_id}: {e}", exc_info=True) | |
error_message = "❌ An unexpected error occurred during processing. Please try again later." | |
# Try to edit the "Processing..." message to show error | |
if message_to_delete: | |
try: | |
await context.bot.edit_message_text( | |
chat_id=chat_id, | |
message_id=message_to_delete, | |
text=error_message | |
) | |
message_to_delete = None # Mark as handled, don't delete later | |
except TelegramError as edit_err: | |
logger.warning(f"Could not edit processing message {message_to_delete} to show error: {edit_err}") | |
# Fallback reply if editing failed | |
await update.message.reply_text(error_message) | |
else: # If sending initial status failed, just send error as new message | |
await update.message.reply_text(error_message) | |
# Delete the "Processing..." message if it was sent and processing didn't fail or error wasn't edited in | |
if message_to_delete: | |
try: | |
await context.bot.delete_message( | |
chat_id=chat_id, | |
message_id=message_to_delete | |
) | |
logger.debug(f"Deleted 'Processing...' message {message_to_delete}") | |
except TelegramError as del_err: | |
# Log warning, but don't bother user if deleting status message fails | |
logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}") | |
# --- Send Results --- | |
# Only proceed if processing didn't explicitly fail with an exception | |
if not processing_failed: | |
if not generated_images: | |
logger.warning(f"Image processing finished but generated 0 images for message {message_id}.") | |
# Inform user if no images were created (e.g., no templates found, or all failed internally) | |
await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. There might be an issue with the templates.") | |
else: | |
send_start_time = loop.time() | |
logger.info(f"Sending {len(generated_images)} generated images back to user {user_id} for message {message_id}.") | |
sent_count = 0 | |
for i, img_path in enumerate(generated_images): | |
# Check if file exists before attempting to send | |
if not os.path.exists(img_path): | |
logger.error(f"Generated image file not found before sending: '{img_path}'") | |
if len(generated_images) > 1: | |
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (internal file missing).") | |
continue # Skip to next image | |
caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "🖼️ Here's your styled image!" | |
try: | |
# Send the photo from the generated path using InputFile | |
await update.message.reply_photo( | |
photo=InputFile(img_path), # PTB handles opening/closing when path is given | |
caption=caption_text, | |
# Consider adding reply_to_message_id=message_id for clearer context in group chats | |
# reply_to_message_id=message_id | |
) | |
sent_count += 1 | |
logger.debug(f"Sent photo {os.path.basename(img_path)}") | |
except TelegramError as e: | |
# Log specific Telegram errors (e.g., file too large, chat not found, blocked by user) | |
logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) | |
if len(generated_images) > 1: | |
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.") | |
except Exception as e: | |
# Catch other potential errors during sending | |
logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) | |
if len(generated_images) > 1: | |
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.") | |
finally: | |
# Clean up the generated image file after attempting to send it | |
try: | |
if os.path.exists(img_path): | |
os.remove(img_path) | |
logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}") | |
except OSError as e: | |
logger.error(f"Error deleting generated image file '{img_path}': {e}") | |
send_time = loop.time() - send_start_time | |
logger.info(f"Finished sending results for message {message_id}. Sent {sent_count}/{len(generated_images)} images in {send_time:.2f} seconds.") | |
# --- Final Cleanup --- | |
# Clean up the originally downloaded user image | |
try: | |
if os.path.exists(temp_user_image_path): | |
os.remove(temp_user_image_path) | |
logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}") | |
except OSError as e: | |
# Log error but don't bother user if temporary file cleanup fails | |
logger.error(f"Error cleaning up user image '{temp_user_image_path}': {e}", exc_info=True) | |
# --- Main Execution --- | |
if __name__ == "__main__": | |
logger.info("Initializing Telegram Bot Application...") | |
# Token check is done in Config class now, exiting if it fails there. | |
try: | |
# Build the application instance | |
app = Application.builder().token(Config.TELEGRAM_TOKEN).build() | |
# Add the handler for messages containing both a photo and a caption | |
# This ensures the 'handle_message' function only receives relevant updates | |
app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message)) | |
logger.info("Bot application built successfully. Starting polling for updates...") | |
# Start the bot polling for updates indefinitely | |
app.run_polling(allowed_updates=Update.ALL_TYPES) # Consider specifying only needed updates | |
except Exception as e: | |
# Catch potential errors during application build or startup | |
logger.critical(f"FATAL error initializing or running the bot application: {e}", exc_info=True) | |
# Exit with a non-zero code to indicate failure | |
exit(1) | |