Spaces:
Runtime error
Runtime error
| # app.py | |
| import os | |
| import random | |
| import textwrap | |
| import logging | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import List, Optional, Tuple | |
| # Attempt to load environment variables from .env file for local testing | |
| # In Hugging Face, secrets are injected directly as environment variables | |
| from dotenv import load_dotenv | |
| load_dotenv() # Load .env file if it exists | |
| from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError | |
| import numpy as np | |
| from telegram import Update, InputFile | |
| from telegram.ext import Application, MessageHandler, filters, ContextTypes | |
| from telegram.constants import ParseMode | |
| from telegram.error import TelegramError | |
| # --- Configuration --- | |
| class Config: | |
| # Read Telegram token from environment variable (set in HF Secrets) | |
| TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN') | |
| if not TELEGRAM_TOKEN: | |
| # Log critical error if token is missing | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logging.critical("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") | |
| raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.") | |
| # Directories (relative paths for container compatibility) | |
| PREDEFINED_TEMPLATES_DIR = "templates" | |
| OUTPUT_DIR = "generated_images" | |
| # UPDATED: Use the font name "Arial". Pillow should find the system-installed font. | |
| FONT_PATH = "Arial" | |
| # Predefined Template Settings | |
| TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas | |
| PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template | |
| PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template | |
| # Auto Template Settings | |
| AUTO_TEMPLATES_COUNT = 5 | |
| AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images | |
| AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template | |
| MIN_FONT_SIZE = 30 | |
| MAX_FONT_SIZE = 60 | |
| TEXT_STROKE_WIDTH = 2 | |
| NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1) | |
| # Other | |
| MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping | |
| JPEG_QUALITY = 85 # Quality for saving JPG images | |
| # --- Logging Setup --- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| ) | |
| # Reduce verbosity of underlying HTTP library logger used by python-telegram-bot | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| # --- Setup --- | |
| # Ensure necessary directories exist within the container's filesystem | |
| # This runs when the script starts inside the container | |
| try: | |
| # These paths are relative to the WORKDIR /app set in the Dockerfile | |
| os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True) | |
| os.makedirs(Config.OUTPUT_DIR, exist_ok=True) | |
| logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}") | |
| except OSError as e: | |
| logger.error(f"FATAL: Error creating essential directories: {e}", exc_info=True) | |
| # If directories cannot be created, the app likely cannot function | |
| raise SystemExit(f"FATAL: Cannot create directories {Config.PREDEFINED_TEMPLATES_DIR} or {Config.OUTPUT_DIR}") from e | |
| # Font check is now less critical as we rely on system fonts, but Pillow will log if "Arial" can't be found. | |
| logger.info(f"Application will attempt to use font '{Config.FONT_PATH}' via system font discovery (fontconfig).") | |
| # --- Helper Functions --- | |
| def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image: | |
| """Adds subtle noise to a PIL image using numpy.""" | |
| try: | |
| # Ensure image is in RGB mode for numpy array processing | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img_array = np.array(img, dtype=np.float32) / 255.0 | |
| # Generate Gaussian noise matching image dimensions | |
| noise = np.random.randn(*img_array.shape) * intensity | |
| # Add noise and clip values to valid range [0, 1] | |
| noisy_img_array = np.clip(img_array + noise, 0.0, 1.0) | |
| # Convert back to PIL Image | |
| noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB') | |
| return noisy_img | |
| except Exception as e: | |
| logger.error(f"Error adding noise: {e}", exc_info=True) | |
| return img # Return original image on error | |
| # --- Image Processing Functions --- | |
| def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]: | |
| """ | |
| Applies user image and caption to a predefined template using pasting. | |
| Args: | |
| user_image_path: Path to the downloaded user image. | |
| caption: Text caption provided by the user. | |
| template_path: Path to the predefined template image. | |
| Returns: | |
| Path to the generated image (in OUTPUT_DIR), or None if an error occurred. | |
| """ | |
| # Generate a unique-ish output filename to avoid conflicts | |
| base_name = os.path.basename(template_path).split('.')[0] | |
| # Include a random element to prevent overwriting if called rapidly | |
| output_filename = f"result_{base_name}_{random.randint(10000, 99999)}.jpg" | |
| output_path = os.path.join(Config.OUTPUT_DIR, output_filename) | |
| logger.debug(f"Applying template '{os.path.basename(template_path)}' to user image '{os.path.basename(user_image_path)}'. Output: {output_path}") | |
| try: | |
| # Use 'with' statement for automatic resource cleanup (file closing) | |
| with Image.open(template_path).convert("RGBA") as template, \ | |
| Image.open(user_image_path).convert("RGBA") as user_image_orig: | |
| # Optional: Check if template size matches config (for consistency) | |
| if template.size != Config.TEMPLATE_SIZE: | |
| logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.") | |
| # Resize user image to fit the placeholder area using Lanczos resampling for quality | |
| logger.debug(f"Resizing user image to placeholder size {Config.PLACEHOLDER_SIZE}") | |
| user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS) | |
| # Create a working copy of the template to paste onto | |
| combined = template.copy() | |
| # Paste the resized user image into the placeholder position | |
| # The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency | |
| logger.debug(f"Pasting resized user image at {Config.PLACEHOLDER_POSITION}") | |
| combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None) | |
| # --- Add Caption --- | |
| logger.debug("Adding caption to template image") | |
| draw = ImageDraw.Draw(combined) | |
| try: | |
| # Use a medium font size relative to config | |
| font_size = Config.MAX_FONT_SIZE // 2 | |
| logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") | |
| font = ImageFont.truetype(Config.FONT_PATH, font_size) | |
| except IOError: # This can happen if font "Arial" is not found by Pillow | |
| logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") | |
| font = ImageFont.load_default() | |
| # Wrap text according to configured width | |
| wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) | |
| logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") | |
| # Calculate text position (e.g., centered below the placeholder area) | |
| # Use textbbox for more accurate positioning | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| # Center horizontally | |
| text_x = (combined.width - text_width) // 2 | |
| # Position below placeholder, add some padding | |
| text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20 # Adjust padding as needed | |
| logger.debug(f"Calculated text position: ({text_x}, {text_y})") | |
| # Draw text with a simple shadow/stroke for better visibility | |
| # Draw shadow slightly offset | |
| shadow_offset = 1 | |
| draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center") | |
| # Draw main text | |
| draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center") | |
| # Convert to RGB before saving as JPG (removes alpha channel) | |
| logger.debug("Converting final image to RGB") | |
| combined_rgb = combined.convert("RGB") | |
| logger.debug(f"Saving final image to {output_path} with quality {Config.JPEG_QUALITY}") | |
| combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) | |
| logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}") | |
| return output_path | |
| except FileNotFoundError: | |
| logger.error(f"Template or user image not found. Template: '{template_path}', User Image: '{user_image_path}'") | |
| except UnidentifiedImageError: | |
| logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: '{template_path}', User Image: '{user_image_path}'") | |
| except Exception as e: | |
| # Log detailed error including traceback | |
| logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True) | |
| # Explicitly return None on any error during the process | |
| return None | |
| def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]: | |
| """ | |
| Generates a dynamic template with various effects. | |
| Args: | |
| user_image_path: Path to the downloaded user image. | |
| caption: Text caption provided by the user. | |
| variant: An integer index to introduce variation. | |
| Returns: | |
| Path to the generated image (in OUTPUT_DIR), or None if an error occurred. | |
| """ | |
| output_filename = f"auto_template_{variant}_{random.randint(10000, 99999)}.jpg" | |
| output_path = os.path.join(Config.OUTPUT_DIR, output_filename) | |
| logger.debug(f"Creating auto template variant {variant}. Output: {output_path}") | |
| try: | |
| # --- Create Background --- | |
| # Generate a random somewhat dark color for the background | |
| bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) | |
| logger.debug(f"Auto template {variant}: Background color {bg_color}") | |
| bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color) | |
| # --- Process User Image --- | |
| with Image.open(user_image_path) as user_img_orig: | |
| # Work on a copy | |
| user_img = user_img_orig.copy() | |
| # Resize to fit the designated area within the auto-template | |
| logger.debug(f"Auto template {variant}: Resizing user image to {Config.AUTO_USER_IMAGE_SIZE}") | |
| user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS) | |
| # Apply random effects based on a random choice (not just variant index) | |
| effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'sharpness', 'none']) | |
| logger.debug(f"Auto template {variant}: Applying effect '{effect_choice}'") | |
| if effect_choice == 'blur': | |
| blur_radius = random.uniform(0.5, 1.8) | |
| logger.debug(f"Applying GaussianBlur with radius {blur_radius:.2f}") | |
| user_img = user_img.filter(ImageFilter.GaussianBlur(blur_radius)) | |
| elif effect_choice == 'noise': | |
| logger.debug(f"Applying noise with intensity {Config.NOISE_INTENSITY:.2f}") | |
| user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY) | |
| elif effect_choice == 'color': # Enhance or reduce color saturation | |
| color_factor = random.uniform(0.3, 1.7) | |
| logger.debug(f"Enhancing color with factor {color_factor:.2f}") | |
| enhancer = ImageEnhance.Color(user_img) | |
| user_img = enhancer.enhance(color_factor) | |
| elif effect_choice == 'contrast': # Enhance or reduce contrast | |
| contrast_factor = random.uniform(0.7, 1.4) | |
| logger.debug(f"Enhancing contrast with factor {contrast_factor:.2f}") | |
| enhancer = ImageEnhance.Contrast(user_img) | |
| user_img = enhancer.enhance(contrast_factor) | |
| elif effect_choice == 'sharpness': # Enhance sharpness | |
| sharpness_factor = random.uniform(1.1, 2.0) | |
| logger.debug(f"Enhancing sharpness with factor {sharpness_factor:.2f}") | |
| enhancer = ImageEnhance.Sharpness(user_img) | |
| user_img = enhancer.enhance(sharpness_factor) | |
| # 'none' applies no extra filter | |
| # Add a decorative border with a random light color | |
| border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255)) | |
| border_width = random.randint(8, 20) | |
| logger.debug(f"Adding border width {border_width} color {border_color}") | |
| user_img = ImageOps.expand(user_img, border=border_width, fill=border_color) | |
| # --- Paste User Image onto Background --- | |
| # Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically | |
| paste_x = (bg.width - user_img.width) // 2 | |
| paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center | |
| logger.debug(f"Pasting processed user image at ({paste_x}, {paste_y})") | |
| bg.paste(user_img, (paste_x, paste_y)) | |
| # --- Add Styled Text --- | |
| logger.debug("Adding caption to auto template") | |
| draw = ImageDraw.Draw(bg) | |
| try: | |
| # Random font size within configured range | |
| font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE) | |
| logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}") | |
| font = ImageFont.truetype(Config.FONT_PATH, font_size) | |
| except IOError: # This can happen if font "Arial" is not found by Pillow | |
| logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.") | |
| font = ImageFont.load_default() # Fallback font | |
| # Wrap text | |
| wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH) | |
| logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"") | |
| # Calculate text position (centered horizontally, below the pasted image) | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center") | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (bg.width - text_width) // 2 | |
| # Position below the image + border, add padding | |
| text_y = paste_y + user_img.height + 30 | |
| logger.debug(f"Calculated text position: ({text_x}, {text_y})") | |
| # Random bright text color and dark stroke color | |
| text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255)) | |
| stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50)) | |
| logger.debug(f"Text color {text_color}, stroke color {stroke_color}") | |
| # Draw text with stroke | |
| draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color, | |
| stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center") | |
| # Save the final image | |
| logger.debug(f"Saving final auto template image to {output_path} with quality {Config.JPEG_QUALITY}") | |
| bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY) | |
| logger.info(f"Generated auto-template image (variant {variant}): {output_path}") | |
| return output_path | |
| except FileNotFoundError: | |
| logger.error(f"User image not found during auto-template creation: '{user_image_path}'") | |
| except UnidentifiedImageError: | |
| logger.error(f"Could not identify user image file during auto-template creation: '{user_image_path}'") | |
| except Exception as e: | |
| logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True) | |
| return None | |
| def load_predefined_templates() -> List[str]: | |
| """Loads paths of all valid template images from the predefined directory.""" | |
| templates = [] | |
| template_dir = Config.PREDEFINED_TEMPLATES_DIR | |
| logger.debug(f"Searching for templates in directory: {os.path.abspath(template_dir)}") | |
| try: | |
| if not os.path.isdir(template_dir): | |
| logger.warning(f"Predefined templates directory not found: '{template_dir}'") | |
| return [] # Return empty list if directory doesn't exist | |
| files = os.listdir(template_dir) | |
| logger.debug(f"Found {len(files)} files/dirs in template directory.") | |
| for file in files: | |
| # Check for common image extensions | |
| if file.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| full_path = os.path.join(template_dir, file) | |
| if os.path.isfile(full_path): # Ensure it's actually a file | |
| templates.append(full_path) | |
| logger.debug(f"Found template: {full_path}") | |
| else: | |
| logger.warning(f"Found item with image extension but is not a file: {full_path}") | |
| if not templates: | |
| logger.warning(f"No valid template image files found in '{template_dir}'.") | |
| else: | |
| logger.info(f"Loaded {len(templates)} predefined templates.") | |
| except Exception as e: | |
| logger.error(f"Error loading predefined templates from '{template_dir}': {e}", exc_info=True) | |
| return templates | |
| # This function orchestrates the image processing. It contains blocking Pillow calls, | |
| # so it's designed to be run in a thread pool executor by the async handler. | |
| def process_images(user_image_path: str, caption: str) -> List[str]: | |
| """ | |
| Processes the user image against predefined and auto-generated templates. | |
| This is a SYNCHRONOUS function. | |
| Args: | |
| user_image_path: Path to the temporary user image file. | |
| caption: The user's caption. | |
| Returns: | |
| A list of paths to the generated images. | |
| """ | |
| logger.info("Starting image processing task...") | |
| generated_image_paths: List[str] = [] | |
| predefined_templates = load_predefined_templates() | |
| # 1. Process predefined templates | |
| if predefined_templates: | |
| logger.info(f"Processing {len(predefined_templates)} predefined templates...") | |
| for template_path in predefined_templates: | |
| result_path = apply_template(user_image_path, caption, template_path) | |
| if result_path: | |
| generated_image_paths.append(result_path) | |
| else: | |
| logger.warning(f"Failed to generate image for template: {os.path.basename(template_path)}") | |
| else: | |
| logger.info("Skipping predefined templates (none found or loaded).") | |
| # 2. Generate auto templates | |
| logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...") | |
| for i in range(Config.AUTO_TEMPLATES_COUNT): | |
| result_path = create_auto_template(user_image_path, caption, i) | |
| if result_path: | |
| generated_image_paths.append(result_path) | |
| else: | |
| logger.warning(f"Failed to generate auto-template variant {i}") | |
| logger.info(f"Image processing task finished. Generated {len(generated_image_paths)} images in total.") | |
| return generated_image_paths | |
| # --- Telegram Bot Handler --- | |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handles incoming messages with photos and captions.""" | |
| # Basic check for essential message components | |
| if not update.message or not update.message.photo or not update.message.caption: | |
| logger.warning("Handler invoked for message missing photo or caption. This shouldn't happen with the current filters.") | |
| return | |
| user = update.message.from_user | |
| # Use 'UnknownUser' or similar if user info isn't available | |
| user_id = user.id if user else "UnknownUser" | |
| user_info = f"user_id={user_id}" + (f", username={user.username}" if user and user.username else "") | |
| caption = update.message.caption | |
| message_id = update.message.message_id | |
| chat_id = update.message.chat_id | |
| logger.info(f"Received photo with caption from {user_info} in chat {chat_id} (message_id={message_id}).") | |
| # --- Download User Image --- | |
| # Create a unique temporary path for the downloaded image within the OUTPUT_DIR | |
| temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg") | |
| file_downloaded = False | |
| download_start_time = asyncio.get_running_loop().time() | |
| try: | |
| photo = update.message.photo[-1] # Get the highest resolution photo available | |
| logger.info(f"Attempting download photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...") | |
| photo_file = await photo.get_file() | |
| await photo_file.download_to_drive(temp_user_image_path) | |
| download_time = asyncio.get_running_loop().time() - download_start_time | |
| logger.info(f"Photo downloaded successfully to '{temp_user_image_path}' in {download_time:.2f} seconds.") | |
| file_downloaded = True | |
| except TelegramError as e: | |
| logger.error(f"Telegram error downloading photo for message {message_id}: {e}", exc_info=True) | |
| await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try sending it again.") | |
| return # Stop processing if download fails | |
| except Exception as e: | |
| logger.error(f"Unexpected error downloading photo for message {message_id}: {e}", exc_info=True) | |
| await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.") | |
| return # Stop processing if download fails | |
| # Safety check, though exceptions should prevent reaching here if download failed | |
| if not file_downloaded or not os.path.exists(temp_user_image_path): | |
| logger.error(f"Download reported success but file '{temp_user_image_path}' does not exist.") | |
| await update.message.reply_text("❌ An internal error occurred after downloading the image.") | |
| return | |
| # --- Process Images in Executor --- | |
| # Notify user that processing has started | |
| processing_message = None | |
| try: | |
| # Quote the original message for context | |
| processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True) | |
| except TelegramError as e: | |
| logger.warning(f"Could not send 'Processing...' message to chat {chat_id}: {e}") | |
| # Continue processing even if status message fails | |
| message_to_delete = processing_message.message_id if processing_message else None | |
| loop = asyncio.get_running_loop() | |
| generated_images = [] | |
| processing_failed = False | |
| processing_start_time = loop.time() | |
| try: | |
| logger.info(f"Submitting image processing task to executor for user image '{os.path.basename(temp_user_image_path)}'") | |
| # Run the blocking image processing function in the default thread pool executor | |
| generated_images = await loop.run_in_executor( | |
| None, # Use default ThreadPoolExecutor | |
| process_images, # The synchronous function to run | |
| temp_user_image_path, # Argument 1 for process_images | |
| caption # Argument 2 for process_images | |
| ) | |
| processing_time = loop.time() - processing_start_time | |
| logger.info(f"Image processing task completed in {processing_time:.2f} seconds.") | |
| except Exception as e: | |
| processing_failed = True | |
| logger.error(f"Error during image processing executor call for message {message_id}: {e}", exc_info=True) | |
| error_message = "❌ An unexpected error occurred during processing. Please try again later." | |
| # Try to edit the "Processing..." message to show error | |
| if message_to_delete: | |
| try: | |
| await context.bot.edit_message_text( | |
| chat_id=chat_id, | |
| message_id=message_to_delete, | |
| text=error_message | |
| ) | |
| message_to_delete = None # Mark as handled, don't delete later | |
| except TelegramError as edit_err: | |
| logger.warning(f"Could not edit processing message {message_to_delete} to show error: {edit_err}") | |
| # Fallback reply if editing failed | |
| await update.message.reply_text(error_message) | |
| else: # If sending initial status failed, just send error as new message | |
| await update.message.reply_text(error_message) | |
| # Delete the "Processing..." message if it was sent and processing didn't fail or error wasn't edited in | |
| if message_to_delete: | |
| try: | |
| await context.bot.delete_message( | |
| chat_id=chat_id, | |
| message_id=message_to_delete | |
| ) | |
| logger.debug(f"Deleted 'Processing...' message {message_to_delete}") | |
| except TelegramError as del_err: | |
| # Log warning, but don't bother user if deleting status message fails | |
| logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}") | |
| # --- Send Results --- | |
| # Only proceed if processing didn't explicitly fail with an exception | |
| if not processing_failed: | |
| if not generated_images: | |
| logger.warning(f"Image processing finished but generated 0 images for message {message_id}.") | |
| # Inform user if no images were created (e.g., no templates found, or all failed internally) | |
| await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. There might be an issue with the templates.") | |
| else: | |
| send_start_time = loop.time() | |
| logger.info(f"Sending {len(generated_images)} generated images back to user {user_id} for message {message_id}.") | |
| sent_count = 0 | |
| for i, img_path in enumerate(generated_images): | |
| # Check if file exists before attempting to send | |
| if not os.path.exists(img_path): | |
| logger.error(f"Generated image file not found before sending: '{img_path}'") | |
| if len(generated_images) > 1: | |
| await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (internal file missing).") | |
| continue # Skip to next image | |
| caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "🖼️ Here's your styled image!" | |
| try: | |
| # Send the photo from the generated path using InputFile | |
| await update.message.reply_photo( | |
| photo=InputFile(img_path), # PTB handles opening/closing when path is given | |
| caption=caption_text, | |
| # Consider adding reply_to_message_id=message_id for clearer context in group chats | |
| # reply_to_message_id=message_id | |
| ) | |
| sent_count += 1 | |
| logger.debug(f"Sent photo {os.path.basename(img_path)}") | |
| except TelegramError as e: | |
| # Log specific Telegram errors (e.g., file too large, chat not found, blocked by user) | |
| logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) | |
| if len(generated_images) > 1: | |
| await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.") | |
| except Exception as e: | |
| # Catch other potential errors during sending | |
| logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True) | |
| if len(generated_images) > 1: | |
| await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.") | |
| finally: | |
| # Clean up the generated image file after attempting to send it | |
| try: | |
| if os.path.exists(img_path): | |
| os.remove(img_path) | |
| logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}") | |
| except OSError as e: | |
| logger.error(f"Error deleting generated image file '{img_path}': {e}") | |
| send_time = loop.time() - send_start_time | |
| logger.info(f"Finished sending results for message {message_id}. Sent {sent_count}/{len(generated_images)} images in {send_time:.2f} seconds.") | |
| # --- Final Cleanup --- | |
| # Clean up the originally downloaded user image | |
| try: | |
| if os.path.exists(temp_user_image_path): | |
| os.remove(temp_user_image_path) | |
| logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}") | |
| except OSError as e: | |
| # Log error but don't bother user if temporary file cleanup fails | |
| logger.error(f"Error cleaning up user image '{temp_user_image_path}': {e}", exc_info=True) | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| logger.info("Initializing Telegram Bot Application...") | |
| # Token check is done in Config class now, exiting if it fails there. | |
| try: | |
| # Build the application instance | |
| app = Application.builder().token(Config.TELEGRAM_TOKEN).build() | |
| # Add the handler for messages containing both a photo and a caption | |
| # This ensures the 'handle_message' function only receives relevant updates | |
| app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message)) | |
| logger.info("Bot application built successfully. Starting polling for updates...") | |
| # Start the bot polling for updates indefinitely | |
| app.run_polling(allowed_updates=Update.ALL_TYPES) # Consider specifying only needed updates | |
| except Exception as e: | |
| # Catch potential errors during application build or startup | |
| logger.critical(f"FATAL error initializing or running the bot application: {e}", exc_info=True) | |
| # Exit with a non-zero code to indicate failure | |
| exit(1) | |