import os from PIL import Image, ImageEnhance import io from skimage import exposure import numpy as np import cv2 import base64 from replicate.client import Client from datetime import datetime from helpercodes.r2_uploader import upload_image_to_s3 from dotenv import load_dotenv from PIL import Image, ImageOps import random import logging # Configure Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) load_dotenv() def _encode_image_to_base64(image_path) -> str: """ Helper to read an image file from disk and encode it to base64. """ buffer = io.BytesIO() image_path.save(buffer, format="JPEG") image_bytes = buffer.getvalue() image_string = base64.b64encode(image_bytes).decode("utf-8") final_string = f"data:image/jpeg;base64,{image_string}" return final_string def apply_image_properties(image: Image.Image) -> Image.Image: """ Apply various image transformations (brightness, contrast, etc.) to a PIL image, returning the modified image. """ pil_image = image # Example properties (these could be parametrized) properties = { 'Brightness': 100, # as percentage 'Contrast': 90, 'Sharpness': -100, 'Saturation': 85, 'Noise': 10, 'Vignette': 10, 'DynamicRange': 78 } # Brightness if 'Brightness' in properties: enhancer = ImageEnhance.Brightness(pil_image) pil_image = enhancer.enhance(properties['Brightness'] / 100) # Contrast if 'Contrast' in properties: enhancer = ImageEnhance.Contrast(pil_image) pil_image = enhancer.enhance(properties['Contrast'] / 100) # Sharpness if 'Sharpness' in properties: enhancer = ImageEnhance.Sharpness(pil_image) pil_image = enhancer.enhance(properties['Sharpness'] / 100) # Saturation if 'Saturation' in properties: enhancer = ImageEnhance.Color(pil_image) pil_image = enhancer.enhance(properties['Saturation'] / 100) cv_image = np.array(pil_image) # Noise if 'Noise' in properties: noise_level = properties['Noise'] / 100 noise = np.random.normal(0, noise_level * 50, cv_image.shape).astype(np.int16) noisy_image = cv_image + noise noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8) cv_image = noisy_image # Vignette if 'Vignette' in properties: rows, cols = cv_image.shape[:2] kernel_x = cv2.getGaussianKernel(cols, cols // 2) kernel_y = cv2.getGaussianKernel(rows, rows // 2) kernel = kernel_y * kernel_x.T mask = 255 * kernel / np.linalg.norm(kernel) for i in range(3): cv_image[..., i] = cv2.addWeighted( cv_image[..., i], 1 - properties['Vignette'] / 100, mask.astype(np.uint8), properties['Vignette'] / 100, 0 ) # Dynamic Range if 'DynamicRange' in properties: dynamic_range = properties['DynamicRange'] / 100 cv_image = exposure.adjust_gamma(cv_image, gamma=dynamic_range) return Image.fromarray(cv_image) def add_random_border(image: Image.Image) -> Image.Image: """ Adds a random border to a given PIL image. Args: image (Image.Image): Input PIL image. Returns: Image.Image: Image with a random border. """ try: # Generate random thickness between 1 and 5 pixels thickness = random.randint(1, 5) # Generate a random color (RGB) random_color = ( random.randint(0, 255), # Red random.randint(0, 255), # Green random.randint(0, 255), # Blue ) # Add border using ImageOps.expand bordered_image = ImageOps.expand(image, border=thickness, fill=random_color) return bordered_image except Exception as e: logging.error(f"Error adding border: {e}", exc_info=True) return image def generate_image(prompt: str, aspect_ratio='1:1', design="None") -> Image.Image: try: logging.info(f"Generating image with prompt: '{prompt}', aspect_ratio: {aspect_ratio}, design: {design}") # Initialize Replicate client api_key = os.getenv("REPLICATE_KEY") if not api_key: logging.error("Missing REPLICATE_KEY in environment variables.") return None replicate_client = Client(api_token=api_key) # inputs = { # "prompt": f"{prompt}.", # "go_fast": True, # "guidance": 3, # "megapixels": "1", # "num_outputs": 1, # "aspect_ratio": "1:1", # "output_format": "jpg", # "output_quality": 50, # "prompt_strength": 0.8, # "num_inference_steps": 28 # } # output = replicate_client.run("black-forest-labs/flux-dev", input=inputs) # image_data = output[0].read() inputs = { "prompt": f"{prompt}.", "aspect_ratio": aspect_ratio, "magic_prompt_option": "Off", "style_type": design } output = replicate_client.run("ideogram-ai/ideogram-v2-turbo", input=inputs) if not output: logging.error("API response is empty. Image generation failed.") return None image_data = output.read() image = Image.open(io.BytesIO(image_data)) image = add_random_border(image) # Create a unique filename timestamp = datetime.now().strftime("%Y%m%d%H%M%S") object_name = f"generated_images/{prompt.replace(' ', '_')}_{timestamp}.png" # Upload to S3 and return URL s3_url = upload_image_to_s3(image, object_name) if not s3_url: logging.error("S3 upload failed. Image URL unavailable.") return None # print (s3_url, object_name) image_url = output.url logging.info(f"Image successfully generated and uploaded: {s3_url}") return image_url except Exception as e: logging.error(f"Error in generate_image: {e}", exc_info=True) return None