Spaces:
Runtime error
Runtime error
File size: 31,074 Bytes
165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 47c135d 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 47c135d 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 47c135d 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 47c135d 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 b0363b8 165c9f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 |
# app.py
import os
import random
import textwrap
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional, Tuple
# Attempt to load environment variables from .env file for local testing
# In Hugging Face, secrets are injected directly as environment variables
from dotenv import load_dotenv
load_dotenv() # Load .env file if it exists
from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageFilter, ImageEnhance, UnidentifiedImageError
import numpy as np
from telegram import Update, InputFile
from telegram.ext import Application, MessageHandler, filters, ContextTypes
from telegram.constants import ParseMode
from telegram.error import TelegramError
# --- Configuration ---
class Config:
# Read Telegram token from environment variable (set in HF Secrets)
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
if not TELEGRAM_TOKEN:
# Log critical error if token is missing
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.critical("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.")
raise ValueError("TELEGRAM_TOKEN environment variable not set! Please set it in Hugging Face Space secrets.")
# Directories (relative paths for container compatibility)
PREDEFINED_TEMPLATES_DIR = "templates"
OUTPUT_DIR = "generated_images"
# UPDATED: Use the font name "Arial". Pillow should find the system-installed font.
FONT_PATH = "Arial"
# Predefined Template Settings
TEMPLATE_SIZE = (1200, 900) # Expected size of predefined template canvas
PLACEHOLDER_SIZE = (700, 500) # Size to fit user image into within the template
PLACEHOLDER_POSITION = (50, 50) # Top-left corner to paste user image in template
# Auto Template Settings
AUTO_TEMPLATES_COUNT = 5
AUTO_TEMPLATE_SIZE = (1200, 900) # Canvas size for auto-generated images
AUTO_USER_IMAGE_SIZE = (600, 450) # Size for user image within auto-template
MIN_FONT_SIZE = 30
MAX_FONT_SIZE = 60
TEXT_STROKE_WIDTH = 2
NOISE_INTENSITY = 0.03 # Intensity for numpy noise effect (0 to 1)
# Other
MAX_CAPTION_WIDTH = 35 # Characters per line for text wrapping
JPEG_QUALITY = 85 # Quality for saving JPG images
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
# Reduce verbosity of underlying HTTP library logger used by python-telegram-bot
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# --- Setup ---
# Ensure necessary directories exist within the container's filesystem
# This runs when the script starts inside the container
try:
# These paths are relative to the WORKDIR /app set in the Dockerfile
os.makedirs(Config.PREDEFINED_TEMPLATES_DIR, exist_ok=True)
os.makedirs(Config.OUTPUT_DIR, exist_ok=True)
logger.info(f"Ensured directories exist: {Config.PREDEFINED_TEMPLATES_DIR}, {Config.OUTPUT_DIR}")
except OSError as e:
logger.error(f"FATAL: Error creating essential directories: {e}", exc_info=True)
# If directories cannot be created, the app likely cannot function
raise SystemExit(f"FATAL: Cannot create directories {Config.PREDEFINED_TEMPLATES_DIR} or {Config.OUTPUT_DIR}") from e
# Font check is now less critical as we rely on system fonts, but Pillow will log if "Arial" can't be found.
logger.info(f"Application will attempt to use font '{Config.FONT_PATH}' via system font discovery (fontconfig).")
# --- Helper Functions ---
def add_noise_to_image(img: Image.Image, intensity: float = 0.02) -> Image.Image:
"""Adds subtle noise to a PIL image using numpy."""
try:
# Ensure image is in RGB mode for numpy array processing
if img.mode != 'RGB':
img = img.convert('RGB')
img_array = np.array(img, dtype=np.float32) / 255.0
# Generate Gaussian noise matching image dimensions
noise = np.random.randn(*img_array.shape) * intensity
# Add noise and clip values to valid range [0, 1]
noisy_img_array = np.clip(img_array + noise, 0.0, 1.0)
# Convert back to PIL Image
noisy_img = Image.fromarray((noisy_img_array * 255).astype(np.uint8), 'RGB')
return noisy_img
except Exception as e:
logger.error(f"Error adding noise: {e}", exc_info=True)
return img # Return original image on error
# --- Image Processing Functions ---
def apply_template(user_image_path: str, caption: str, template_path: str) -> Optional[str]:
"""
Applies user image and caption to a predefined template using pasting.
Args:
user_image_path: Path to the downloaded user image.
caption: Text caption provided by the user.
template_path: Path to the predefined template image.
Returns:
Path to the generated image (in OUTPUT_DIR), or None if an error occurred.
"""
# Generate a unique-ish output filename to avoid conflicts
base_name = os.path.basename(template_path).split('.')[0]
# Include a random element to prevent overwriting if called rapidly
output_filename = f"result_{base_name}_{random.randint(10000, 99999)}.jpg"
output_path = os.path.join(Config.OUTPUT_DIR, output_filename)
logger.debug(f"Applying template '{os.path.basename(template_path)}' to user image '{os.path.basename(user_image_path)}'. Output: {output_path}")
try:
# Use 'with' statement for automatic resource cleanup (file closing)
with Image.open(template_path).convert("RGBA") as template, \
Image.open(user_image_path).convert("RGBA") as user_image_orig:
# Optional: Check if template size matches config (for consistency)
if template.size != Config.TEMPLATE_SIZE:
logger.warning(f"Template {os.path.basename(template_path)} size {template.size} differs from expected {Config.TEMPLATE_SIZE}. Results may vary.")
# Resize user image to fit the placeholder area using Lanczos resampling for quality
logger.debug(f"Resizing user image to placeholder size {Config.PLACEHOLDER_SIZE}")
user_image_resized = ImageOps.fit(user_image_orig, Config.PLACEHOLDER_SIZE, Image.Resampling.LANCZOS)
# Create a working copy of the template to paste onto
combined = template.copy()
# Paste the resized user image into the placeholder position
# The third argument (mask) uses the alpha channel of the user image for smooth edges if it has transparency
logger.debug(f"Pasting resized user image at {Config.PLACEHOLDER_POSITION}")
combined.paste(user_image_resized, Config.PLACEHOLDER_POSITION, user_image_resized if user_image_resized.mode == 'RGBA' else None)
# --- Add Caption ---
logger.debug("Adding caption to template image")
draw = ImageDraw.Draw(combined)
try:
# Use a medium font size relative to config
font_size = Config.MAX_FONT_SIZE // 2
logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}")
font = ImageFont.truetype(Config.FONT_PATH, font_size)
except IOError: # This can happen if font "Arial" is not found by Pillow
logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.")
font = ImageFont.load_default()
# Wrap text according to configured width
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH)
logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"")
# Calculate text position (e.g., centered below the placeholder area)
# Use textbbox for more accurate positioning
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center")
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Center horizontally
text_x = (combined.width - text_width) // 2
# Position below placeholder, add some padding
text_y = Config.PLACEHOLDER_POSITION[1] + Config.PLACEHOLDER_SIZE[1] + 20 # Adjust padding as needed
logger.debug(f"Calculated text position: ({text_x}, {text_y})")
# Draw text with a simple shadow/stroke for better visibility
# Draw shadow slightly offset
shadow_offset = 1
draw.text((text_x + shadow_offset, text_y + shadow_offset), wrapped_text, font=font, fill="black", align="center")
# Draw main text
draw.text((text_x, text_y), wrapped_text, font=font, fill="white", align="center")
# Convert to RGB before saving as JPG (removes alpha channel)
logger.debug("Converting final image to RGB")
combined_rgb = combined.convert("RGB")
logger.debug(f"Saving final image to {output_path} with quality {Config.JPEG_QUALITY}")
combined_rgb.save(output_path, "JPEG", quality=Config.JPEG_QUALITY)
logger.info(f"Generated image using template '{os.path.basename(template_path)}': {output_path}")
return output_path
except FileNotFoundError:
logger.error(f"Template or user image not found. Template: '{template_path}', User Image: '{user_image_path}'")
except UnidentifiedImageError:
logger.error(f"Could not identify image file (corrupted or unsupported format?). Template: '{template_path}', User Image: '{user_image_path}'")
except Exception as e:
# Log detailed error including traceback
logger.error(f"Error applying template '{os.path.basename(template_path)}': {e}", exc_info=True)
# Explicitly return None on any error during the process
return None
def create_auto_template(user_image_path: str, caption: str, variant: int) -> Optional[str]:
"""
Generates a dynamic template with various effects.
Args:
user_image_path: Path to the downloaded user image.
caption: Text caption provided by the user.
variant: An integer index to introduce variation.
Returns:
Path to the generated image (in OUTPUT_DIR), or None if an error occurred.
"""
output_filename = f"auto_template_{variant}_{random.randint(10000, 99999)}.jpg"
output_path = os.path.join(Config.OUTPUT_DIR, output_filename)
logger.debug(f"Creating auto template variant {variant}. Output: {output_path}")
try:
# --- Create Background ---
# Generate a random somewhat dark color for the background
bg_color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
logger.debug(f"Auto template {variant}: Background color {bg_color}")
bg = Image.new('RGB', Config.AUTO_TEMPLATE_SIZE, color=bg_color)
# --- Process User Image ---
with Image.open(user_image_path) as user_img_orig:
# Work on a copy
user_img = user_img_orig.copy()
# Resize to fit the designated area within the auto-template
logger.debug(f"Auto template {variant}: Resizing user image to {Config.AUTO_USER_IMAGE_SIZE}")
user_img = ImageOps.fit(user_img, Config.AUTO_USER_IMAGE_SIZE, Image.Resampling.LANCZOS)
# Apply random effects based on a random choice (not just variant index)
effect_choice = random.choice(['blur', 'noise', 'color', 'contrast', 'sharpness', 'none'])
logger.debug(f"Auto template {variant}: Applying effect '{effect_choice}'")
if effect_choice == 'blur':
blur_radius = random.uniform(0.5, 1.8)
logger.debug(f"Applying GaussianBlur with radius {blur_radius:.2f}")
user_img = user_img.filter(ImageFilter.GaussianBlur(blur_radius))
elif effect_choice == 'noise':
logger.debug(f"Applying noise with intensity {Config.NOISE_INTENSITY:.2f}")
user_img = add_noise_to_image(user_img, Config.NOISE_INTENSITY)
elif effect_choice == 'color': # Enhance or reduce color saturation
color_factor = random.uniform(0.3, 1.7)
logger.debug(f"Enhancing color with factor {color_factor:.2f}")
enhancer = ImageEnhance.Color(user_img)
user_img = enhancer.enhance(color_factor)
elif effect_choice == 'contrast': # Enhance or reduce contrast
contrast_factor = random.uniform(0.7, 1.4)
logger.debug(f"Enhancing contrast with factor {contrast_factor:.2f}")
enhancer = ImageEnhance.Contrast(user_img)
user_img = enhancer.enhance(contrast_factor)
elif effect_choice == 'sharpness': # Enhance sharpness
sharpness_factor = random.uniform(1.1, 2.0)
logger.debug(f"Enhancing sharpness with factor {sharpness_factor:.2f}")
enhancer = ImageEnhance.Sharpness(user_img)
user_img = enhancer.enhance(sharpness_factor)
# 'none' applies no extra filter
# Add a decorative border with a random light color
border_color = (random.randint(180, 255), random.randint(180, 255), random.randint(180, 255))
border_width = random.randint(8, 20)
logger.debug(f"Adding border width {border_width} color {border_color}")
user_img = ImageOps.expand(user_img, border=border_width, fill=border_color)
# --- Paste User Image onto Background ---
# Calculate position to center the (bordered) user image horizontally, and place it in the upper part vertically
paste_x = (bg.width - user_img.width) // 2
paste_y = (bg.height - user_img.height) // 3 # Position slightly above vertical center
logger.debug(f"Pasting processed user image at ({paste_x}, {paste_y})")
bg.paste(user_img, (paste_x, paste_y))
# --- Add Styled Text ---
logger.debug("Adding caption to auto template")
draw = ImageDraw.Draw(bg)
try:
# Random font size within configured range
font_size = random.randint(Config.MIN_FONT_SIZE, Config.MAX_FONT_SIZE)
logger.debug(f"Loading font '{Config.FONT_PATH}' with size {font_size}")
font = ImageFont.truetype(Config.FONT_PATH, font_size)
except IOError: # This can happen if font "Arial" is not found by Pillow
logger.warning(f"Failed to load font '{Config.FONT_PATH}' by name. Using Pillow's default. Ensure 'ttf-mscorefonts-installer' worked in Docker and fontconfig is effective.")
font = ImageFont.load_default() # Fallback font
# Wrap text
wrapped_text = textwrap.fill(caption, width=Config.MAX_CAPTION_WIDTH)
logger.debug(f"Wrapped caption text: \"{wrapped_text[:50]}...\"")
# Calculate text position (centered horizontally, below the pasted image)
text_bbox = draw.textbbox((0, 0), wrapped_text, font=font, align="center")
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (bg.width - text_width) // 2
# Position below the image + border, add padding
text_y = paste_y + user_img.height + 30
logger.debug(f"Calculated text position: ({text_x}, {text_y})")
# Random bright text color and dark stroke color
text_color = (random.randint(200, 255), random.randint(200, 255), random.randint(200, 255))
stroke_color = (random.randint(0, 50), random.randint(0, 50), random.randint(0, 50))
logger.debug(f"Text color {text_color}, stroke color {stroke_color}")
# Draw text with stroke
draw.text((text_x, text_y), wrapped_text, font=font, fill=text_color,
stroke_width=Config.TEXT_STROKE_WIDTH, stroke_fill=stroke_color, align="center")
# Save the final image
logger.debug(f"Saving final auto template image to {output_path} with quality {Config.JPEG_QUALITY}")
bg.save(output_path, "JPEG", quality=Config.JPEG_QUALITY)
logger.info(f"Generated auto-template image (variant {variant}): {output_path}")
return output_path
except FileNotFoundError:
logger.error(f"User image not found during auto-template creation: '{user_image_path}'")
except UnidentifiedImageError:
logger.error(f"Could not identify user image file during auto-template creation: '{user_image_path}'")
except Exception as e:
logger.error(f"Error creating auto-template variant {variant}: {e}", exc_info=True)
return None
def load_predefined_templates() -> List[str]:
"""Loads paths of all valid template images from the predefined directory."""
templates = []
template_dir = Config.PREDEFINED_TEMPLATES_DIR
logger.debug(f"Searching for templates in directory: {os.path.abspath(template_dir)}")
try:
if not os.path.isdir(template_dir):
logger.warning(f"Predefined templates directory not found: '{template_dir}'")
return [] # Return empty list if directory doesn't exist
files = os.listdir(template_dir)
logger.debug(f"Found {len(files)} files/dirs in template directory.")
for file in files:
# Check for common image extensions
if file.lower().endswith(('.png', '.jpg', '.jpeg')):
full_path = os.path.join(template_dir, file)
if os.path.isfile(full_path): # Ensure it's actually a file
templates.append(full_path)
logger.debug(f"Found template: {full_path}")
else:
logger.warning(f"Found item with image extension but is not a file: {full_path}")
if not templates:
logger.warning(f"No valid template image files found in '{template_dir}'.")
else:
logger.info(f"Loaded {len(templates)} predefined templates.")
except Exception as e:
logger.error(f"Error loading predefined templates from '{template_dir}': {e}", exc_info=True)
return templates
# This function orchestrates the image processing. It contains blocking Pillow calls,
# so it's designed to be run in a thread pool executor by the async handler.
def process_images(user_image_path: str, caption: str) -> List[str]:
"""
Processes the user image against predefined and auto-generated templates.
This is a SYNCHRONOUS function.
Args:
user_image_path: Path to the temporary user image file.
caption: The user's caption.
Returns:
A list of paths to the generated images.
"""
logger.info("Starting image processing task...")
generated_image_paths: List[str] = []
predefined_templates = load_predefined_templates()
# 1. Process predefined templates
if predefined_templates:
logger.info(f"Processing {len(predefined_templates)} predefined templates...")
for template_path in predefined_templates:
result_path = apply_template(user_image_path, caption, template_path)
if result_path:
generated_image_paths.append(result_path)
else:
logger.warning(f"Failed to generate image for template: {os.path.basename(template_path)}")
else:
logger.info("Skipping predefined templates (none found or loaded).")
# 2. Generate auto templates
logger.info(f"Generating {Config.AUTO_TEMPLATES_COUNT} auto-templates...")
for i in range(Config.AUTO_TEMPLATES_COUNT):
result_path = create_auto_template(user_image_path, caption, i)
if result_path:
generated_image_paths.append(result_path)
else:
logger.warning(f"Failed to generate auto-template variant {i}")
logger.info(f"Image processing task finished. Generated {len(generated_image_paths)} images in total.")
return generated_image_paths
# --- Telegram Bot Handler ---
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handles incoming messages with photos and captions."""
# Basic check for essential message components
if not update.message or not update.message.photo or not update.message.caption:
logger.warning("Handler invoked for message missing photo or caption. This shouldn't happen with the current filters.")
return
user = update.message.from_user
# Use 'UnknownUser' or similar if user info isn't available
user_id = user.id if user else "UnknownUser"
user_info = f"user_id={user_id}" + (f", username={user.username}" if user and user.username else "")
caption = update.message.caption
message_id = update.message.message_id
chat_id = update.message.chat_id
logger.info(f"Received photo with caption from {user_info} in chat {chat_id} (message_id={message_id}).")
# --- Download User Image ---
# Create a unique temporary path for the downloaded image within the OUTPUT_DIR
temp_user_image_path = os.path.join(Config.OUTPUT_DIR, f"user_{user_id}_{message_id}.jpg")
file_downloaded = False
download_start_time = asyncio.get_running_loop().time()
try:
photo = update.message.photo[-1] # Get the highest resolution photo available
logger.info(f"Attempting download photo (file_id: {photo.file_id}, size: {photo.width}x{photo.height})...")
photo_file = await photo.get_file()
await photo_file.download_to_drive(temp_user_image_path)
download_time = asyncio.get_running_loop().time() - download_start_time
logger.info(f"Photo downloaded successfully to '{temp_user_image_path}' in {download_time:.2f} seconds.")
file_downloaded = True
except TelegramError as e:
logger.error(f"Telegram error downloading photo for message {message_id}: {e}", exc_info=True)
await update.message.reply_text("❌ Sorry, there was a Telegram error downloading the image. Please try sending it again.")
return # Stop processing if download fails
except Exception as e:
logger.error(f"Unexpected error downloading photo for message {message_id}: {e}", exc_info=True)
await update.message.reply_text("❌ Sorry, I couldn't download the image due to an unexpected error.")
return # Stop processing if download fails
# Safety check, though exceptions should prevent reaching here if download failed
if not file_downloaded or not os.path.exists(temp_user_image_path):
logger.error(f"Download reported success but file '{temp_user_image_path}' does not exist.")
await update.message.reply_text("❌ An internal error occurred after downloading the image.")
return
# --- Process Images in Executor ---
# Notify user that processing has started
processing_message = None
try:
# Quote the original message for context
processing_message = await update.message.reply_text("⏳ Processing your image with different styles...", quote=True)
except TelegramError as e:
logger.warning(f"Could not send 'Processing...' message to chat {chat_id}: {e}")
# Continue processing even if status message fails
message_to_delete = processing_message.message_id if processing_message else None
loop = asyncio.get_running_loop()
generated_images = []
processing_failed = False
processing_start_time = loop.time()
try:
logger.info(f"Submitting image processing task to executor for user image '{os.path.basename(temp_user_image_path)}'")
# Run the blocking image processing function in the default thread pool executor
generated_images = await loop.run_in_executor(
None, # Use default ThreadPoolExecutor
process_images, # The synchronous function to run
temp_user_image_path, # Argument 1 for process_images
caption # Argument 2 for process_images
)
processing_time = loop.time() - processing_start_time
logger.info(f"Image processing task completed in {processing_time:.2f} seconds.")
except Exception as e:
processing_failed = True
logger.error(f"Error during image processing executor call for message {message_id}: {e}", exc_info=True)
error_message = "❌ An unexpected error occurred during processing. Please try again later."
# Try to edit the "Processing..." message to show error
if message_to_delete:
try:
await context.bot.edit_message_text(
chat_id=chat_id,
message_id=message_to_delete,
text=error_message
)
message_to_delete = None # Mark as handled, don't delete later
except TelegramError as edit_err:
logger.warning(f"Could not edit processing message {message_to_delete} to show error: {edit_err}")
# Fallback reply if editing failed
await update.message.reply_text(error_message)
else: # If sending initial status failed, just send error as new message
await update.message.reply_text(error_message)
# Delete the "Processing..." message if it was sent and processing didn't fail or error wasn't edited in
if message_to_delete:
try:
await context.bot.delete_message(
chat_id=chat_id,
message_id=message_to_delete
)
logger.debug(f"Deleted 'Processing...' message {message_to_delete}")
except TelegramError as del_err:
# Log warning, but don't bother user if deleting status message fails
logger.warning(f"Could not delete 'Processing...' message ({message_to_delete}): {del_err}")
# --- Send Results ---
# Only proceed if processing didn't explicitly fail with an exception
if not processing_failed:
if not generated_images:
logger.warning(f"Image processing finished but generated 0 images for message {message_id}.")
# Inform user if no images were created (e.g., no templates found, or all failed internally)
await update.message.reply_text("😕 Sorry, I couldn't generate any styled images this time. There might be an issue with the templates.")
else:
send_start_time = loop.time()
logger.info(f"Sending {len(generated_images)} generated images back to user {user_id} for message {message_id}.")
sent_count = 0
for i, img_path in enumerate(generated_images):
# Check if file exists before attempting to send
if not os.path.exists(img_path):
logger.error(f"Generated image file not found before sending: '{img_path}'")
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} (internal file missing).")
continue # Skip to next image
caption_text = f"Style variant {i+1}" if len(generated_images) > 1 else "🖼️ Here's your styled image!"
try:
# Send the photo from the generated path using InputFile
await update.message.reply_photo(
photo=InputFile(img_path), # PTB handles opening/closing when path is given
caption=caption_text,
# Consider adding reply_to_message_id=message_id for clearer context in group chats
# reply_to_message_id=message_id
)
sent_count += 1
logger.debug(f"Sent photo {os.path.basename(img_path)}")
except TelegramError as e:
# Log specific Telegram errors (e.g., file too large, chat not found, blocked by user)
logger.error(f"Telegram error sending photo {os.path.basename(img_path)}: {e}", exc_info=True)
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to a Telegram error.")
except Exception as e:
# Catch other potential errors during sending
logger.error(f"Unexpected error sending photo {os.path.basename(img_path)}: {e}", exc_info=True)
if len(generated_images) > 1:
await update.message.reply_text(f"⚠️ Couldn't send style variant {i+1} due to an unexpected error.")
finally:
# Clean up the generated image file after attempting to send it
try:
if os.path.exists(img_path):
os.remove(img_path)
logger.debug(f"Cleaned up generated image: {os.path.basename(img_path)}")
except OSError as e:
logger.error(f"Error deleting generated image file '{img_path}': {e}")
send_time = loop.time() - send_start_time
logger.info(f"Finished sending results for message {message_id}. Sent {sent_count}/{len(generated_images)} images in {send_time:.2f} seconds.")
# --- Final Cleanup ---
# Clean up the originally downloaded user image
try:
if os.path.exists(temp_user_image_path):
os.remove(temp_user_image_path)
logger.info(f"Cleaned up temporary user image: {os.path.basename(temp_user_image_path)}")
except OSError as e:
# Log error but don't bother user if temporary file cleanup fails
logger.error(f"Error cleaning up user image '{temp_user_image_path}': {e}", exc_info=True)
# --- Main Execution ---
if __name__ == "__main__":
logger.info("Initializing Telegram Bot Application...")
# Token check is done in Config class now, exiting if it fails there.
try:
# Build the application instance
app = Application.builder().token(Config.TELEGRAM_TOKEN).build()
# Add the handler for messages containing both a photo and a caption
# This ensures the 'handle_message' function only receives relevant updates
app.add_handler(MessageHandler(filters.PHOTO & filters.CAPTION, handle_message))
logger.info("Bot application built successfully. Starting polling for updates...")
# Start the bot polling for updates indefinitely
app.run_polling(allowed_updates=Update.ALL_TYPES) # Consider specifying only needed updates
except Exception as e:
# Catch potential errors during application build or startup
logger.critical(f"FATAL error initializing or running the bot application: {e}", exc_info=True)
# Exit with a non-zero code to indicate failure
exit(1)
|