Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from diffusers import StableDiffusionPipeline, StableDiffusionXLPipeline, DPMSolverMultistepScheduler | |
| from diffusers.models import UNet2DConditionModel | |
| import torch | |
| import os | |
| from PIL import Image | |
| import base64 | |
| import time | |
| import logging | |
| # Disable GPU detection | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| os.environ["CUDA_DEVICE_ORDER"] = "" | |
| os.environ["TORCH_CUDA_ARCH_LIST"] = "" | |
| torch.set_default_device("cpu") | |
| app = Flask(__name__, static_folder='static') | |
| CORS(app) | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Log device in use | |
| logger.info(f"Device in use: {torch.device('cpu')}") | |
| # Model cache | |
| model_cache = {} | |
| model_paths = { | |
| "ssd-1b": "remiai3/ssd-1b", | |
| "sd-v1-5": "remiai3/stable-diffusion-v1-5" | |
| } | |
| # Image ratio to dimensions (optimized for CPU) | |
| ratio_to_dims = { | |
| "1:1": (256, 256), | |
| "3:4": (192, 256), | |
| "16:9": (256, 144) | |
| } | |
| def load_model(model_id): | |
| if model_id not in model_cache: | |
| logger.info(f"Loading model {model_id}...") | |
| try: | |
| if model_id == "ssd-1b": | |
| # Try StableDiffusionXLPipeline first | |
| try: | |
| logger.info(f"Attempting StableDiffusionXLPipeline for {model_id}") | |
| pipe = StableDiffusionXLPipeline.from_pretrained( | |
| model_paths[model_id], | |
| torch_dtype=torch.float32, | |
| use_auth_token=os.getenv("HF_TOKEN"), | |
| use_safetensors=True, | |
| low_cpu_mem_usage=True, | |
| force_download=True | |
| ) | |
| except Exception as e: | |
| logger.warning(f"StableDiffusionXLPipeline failed for {model_id}: {str(e)}") | |
| logger.info(f"Falling back to StableDiffusionPipeline for {model_id}") | |
| # Fallback to StableDiffusionPipeline with patched UNet | |
| unet_config = UNet2DConditionModel.load_config( | |
| f"{model_paths[model_id]}/unet", | |
| use_auth_token=os.getenv("HF_TOKEN"), | |
| force_download=True | |
| ) | |
| if "reverse_transformer_layers_per_block" in unet_config: | |
| logger.info(f"Original UNet config for {model_id}: {unet_config}") | |
| unet_config["reverse_transformer_layers_per_block"] = None | |
| logger.info(f"Patched UNet config for {model_id}: {unet_config}") | |
| unet = UNet2DConditionModel.from_config(unet_config) | |
| unet.load_state_dict( | |
| torch.load( | |
| f"{model_paths[model_id]}/unet/diffusion_pytorch_model.bin", | |
| map_location="cpu" | |
| ) | |
| ) | |
| pipe = StableDiffusionPipeline.from_pretrained( | |
| model_paths[model_id], | |
| unet=unet, | |
| torch_dtype=torch.float32, | |
| use_auth_token=os.getenv("HF_TOKEN"), | |
| use_safetensors=True, | |
| low_cpu_mem_usage=True, | |
| force_download=True | |
| ) | |
| else: | |
| # Standard loading for sd-v1-5 | |
| pipe = StableDiffusionPipeline.from_pretrained( | |
| model_paths[model_id], | |
| torch_dtype=torch.float32, | |
| use_auth_token=os.getenv("HF_TOKEN"), | |
| use_safetensors=True, | |
| low_cpu_mem_usage=True, | |
| force_download=True | |
| ) | |
| logger.info(f"Pipeline components loading for {model_id}...") | |
| pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
| pipe.enable_attention_slicing() | |
| pipe.to(torch.device("cpu")) | |
| model_cache[model_id] = pipe | |
| logger.info(f"Model {model_id} loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading model {model_id}: {str(e)}") | |
| raise | |
| return model_cache[model_id] | |
| def index(): | |
| return app.send_static_file('index.html') | |
| def serve_assets(filename): | |
| return app.send_static_file(os.path.join('assets', filename)) | |
| def generate(): | |
| try: | |
| data = request.json | |
| model_id = data.get('model', 'ssd-1b') | |
| prompt = data.get('prompt', '') | |
| ratio = data.get('ratio', '1:1') | |
| num_images = min(int(data.get('num_images', 1)), 4) | |
| guidance_scale = float(data.get('guidance_scale', 7.5)) | |
| if not prompt: | |
| return jsonify({"error": "Prompt is required"}), 400 | |
| if model_id == 'ssd-1b' and num_images > 1: | |
| return jsonify({"error": "SSD-1B allows only 1 image per generation"}), 400 | |
| if model_id == 'ssd-1b' and ratio != '1:1': | |
| return jsonify({"error": "SSD-1B supports only 1:1 ratio"}), 400 | |
| if model_id == 'sd-v1-5' and len(prompt.split()) > 77: | |
| return jsonify({"error": "Prompt exceeds 77 tokens for Stable Diffusion v1.5"}), 400 | |
| width, height = ratio_to_dims.get(ratio, (256, 256)) | |
| pipe = load_model(model_id) | |
| pipe.to(torch.device("cpu")) | |
| images = [] | |
| num_inference_steps = 20 if model_id == 'ssd-1b' else 30 | |
| for _ in range(num_images): | |
| image = pipe( | |
| prompt=prompt, | |
| height=height, | |
| width=width, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale | |
| ).images[0] | |
| images.append(image) | |
| output_dir = "outputs" | |
| os.makedirs(output_dir, exist_ok=True) | |
| image_urls = [] | |
| for i, img in enumerate(images): | |
| img_path = os.path.join(output_dir, f"generated_{int(time.time())}_{i}.png") | |
| img.save(img_path) | |
| with open(img_path, "rb") as f: | |
| img_data = base64.b64encode(f.read()).decode('utf-8') | |
| image_urls.append(f"data:image/png;base64,{img_data}") | |
| os.remove(img_path) | |
| return jsonify({"images": image_urls}) | |
| except Exception as e: | |
| logger.error(f"Image generation failed: {str(e)}") | |
| return jsonify({"error": f"Image generation failed: {str(e)}"}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) |