import os import json import copy import time import random import logging import numpy as np from typing import Any, Dict, List, Optional, Union import gradio as gr import logging import torch from PIL import Image import spaces from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image from diffusers.utils import load_image from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download import requests import pandas as pd from transformers.utils import move_cache move_cache() from diffusers import ( DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image, FluxPipeline, FlowMatchEulerDiscreteScheduler) from huggingface_hub import ( hf_hub_download, HfFileSystem, ModelCard, snapshot_download) from diffusers.utils import load_image from huggingface_hub import HfApi token = os.getenv("HF_TOKEN") #---if workspace = local or colab--- # Authenticate with Hugging Face # from huggingface_hub import login # Log in to Hugging Face using the provided token # hf_token = 'hf-token-authentication' # login(hf_token) def calculate_shift( image_seq_len, base_seq_len: int = 256, max_seq_len: int = 4096, base_shift: float = 0.5, max_shift: float = 1.16, ): m = (max_shift - base_shift) / (max_seq_len - base_seq_len) b = base_shift - m * base_seq_len mu = image_seq_len * m + b return mu def retrieve_timesteps( scheduler, num_inference_steps: Optional[int] = None, device: Optional[Union[str, torch.device]] = None, timesteps: Optional[List[int]] = None, sigmas: Optional[List[float]] = None, **kwargs, ): if timesteps is not None and sigmas is not None: raise ValueError("Apenas um entre `timesteps` ou `sigmas` pode ser passado. Por favor, escolha um para definir valores personalizados") scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) elif sigmas is not None: scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) else: scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) timesteps = scheduler.timesteps return timesteps, num_inference_steps # FLUX pipeline @torch.inference_mode() def flux_pipe_call_that_returns_an_iterable_of_images( self, prompt: Union[str, List[str]] = None, prompt_2: Optional[Union[str, List[str]]] = None, height: Optional[int] = None, width: Optional[int] = None, num_inference_steps: int = 28, timesteps: List[int] = None, guidance_scale: float = 3.5, num_images_per_prompt: Optional[int] = 1, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.FloatTensor] = None, prompt_embeds: Optional[torch.FloatTensor] = None, pooled_prompt_embeds: Optional[torch.FloatTensor] = None, output_type: Optional[str] = "pil", return_dict: bool = True, joint_attention_kwargs: Optional[Dict[str, Any]] = None, max_sequence_length: int = 512, good_vae: Optional[Any] = None, ): height = height or self.default_sample_size * self.vae_scale_factor width = width or self.default_sample_size * self.vae_scale_factor self.check_inputs( prompt, prompt_2, height, width, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, max_sequence_length=max_sequence_length, ) self._guidance_scale = guidance_scale self._joint_attention_kwargs = joint_attention_kwargs self._interrupt = False batch_size = 1 if isinstance(prompt, str) else len(prompt) device = self._execution_device lora_scale = joint_attention_kwargs.get("scale", None) if joint_attention_kwargs is not None else None prompt_embeds, pooled_prompt_embeds, text_ids = self.encode_prompt( prompt=prompt, prompt_2=prompt_2, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, device=device, num_images_per_prompt=num_images_per_prompt, max_sequence_length=max_sequence_length, lora_scale=lora_scale, ) num_channels_latents = self.transformer.config.in_channels // 4 latents, latent_image_ids = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, ) sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) image_seq_len = latents.shape[1] mu = calculate_shift( image_seq_len, self.scheduler.config.base_image_seq_len, self.scheduler.config.max_image_seq_len, self.scheduler.config.base_shift, self.scheduler.config.max_shift, ) timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas, mu=mu, ) self._num_timesteps = len(timesteps) guidance = torch.full([1], guidance_scale, device=device, dtype=torch.float32).expand(latents.shape[0]) if self.transformer.config.guidance_embeds else None for i, t in enumerate(timesteps): if self.interrupt: continue timestep = t.expand(latents.shape[0]).to(latents.dtype) noise_pred = self.transformer( hidden_states=latents, timestep=timestep / 1000, guidance=guidance, pooled_projections=pooled_prompt_embeds, encoder_hidden_states=prompt_embeds, txt_ids=text_ids, img_ids=latent_image_ids, joint_attention_kwargs=self.joint_attention_kwargs, return_dict=False, )[0] latents_for_image = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents_for_image = (latents_for_image / self.vae.config.scaling_factor) + self.vae.config.shift_factor image = self.vae.decode(latents_for_image, return_dict=False)[0] yield self.image_processor.postprocess(image, output_type=output_type)[0] latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0] torch.cuda.empty_cache() latents = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents = (latents / good_vae.config.scaling_factor) + good_vae.config.shift_factor image = good_vae.decode(latents, return_dict=False)[0] self.maybe_free_model_hooks() torch.cuda.empty_cache() yield self.image_processor.postprocess(image, output_type=output_type)[0] #------------------------------------------------------------------------------------------------------------------------------------------------------------# loras = [ #Super-Realism { "image": "https://huggingface.co/Collos/Jalves/resolve/main/images/jose.webp", "title": "Jose Alves", "repo": "Collos/Jalves", "weights": "Jalves.safetensors", "trigger_word": "José Alves" }, { "image": "https://huggingface.co/Collos/JulioCesar/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.33.50.jpeg", "title": "Júlio César", "repo": "Collos/JulioCesar", "weights": "julio.safetensorss", "trigger_word": "Júlio" }, { "image": "https://huggingface.co/Collos/PedroJr/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.34.01.jpeg", "title": "Pedro Jr.", "repo": "Collos/PedroJr", "weights": "pedrojr.safetensors", "trigger_word": "Pedro" }, { "image": "https://huggingface.co/Collos/JoseClovis/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.38.50.jpeg", "title": "José Clóvis", "repo": "Collos/JoseClovis", "weights": "clovis.safetensors", "trigger_word": "Clóvis" } #add new ] # Initialize the base model use_auth_token=True dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" base_model = "black-forest-labs/FLUX.1-dev" taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained( base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) #TAEF1 is very tiny autoencoder which uses the same "latent API" as FLUX.1's VAE. FLUX.1 is useful for real-time previewing of the FLUX.1 generation process.# taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained(base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) MAX_SEED = 2**32-1 pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"tempo passado para {self.activity_name}: {self.elapsed_time:.6f} segundos") else: print(f"tempo passado: {self.elapsed_time:.6f} segundos") def update_selection(evt: gr.SelectData, width, height): selected_lora = loras[evt.index] new_placeholder = f"Digite o prompt para {selected_lora['title']}" lora_repo = selected_lora["repo"] updated_text = f"### Selecionado: [{lora_repo}](https://huggingface.co/{lora_repo}) ✅" if "aspect" in selected_lora: if selected_lora["aspect"] == "retrato": width = 768 height = 1024 elif selected_lora["aspect"] == "paisagem": width = 1024 height = 768 else: width = 1024 height = 1024 return ( gr.update(placeholder=new_placeholder), updated_text, evt.index, width, height, ) @spaces.GPU(duration=100) def generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress): pipe.to("cuda") generator = torch.Generator(device="cuda").manual_seed(seed) with calculateDuration("Generating image"): # Generate image for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( prompt=prompt_mash, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", good_vae=good_vae, ): yield img def generate_image_to_image(prompt_mash, image_input_path, image_strength, steps, cfg_scale, width, height, lora_scale, seed): generator = torch.Generator(device="cuda").manual_seed(seed) pipe_i2i.to("cuda") image_input = load_image(image_input_path) final_image = pipe_i2i( prompt=prompt_mash, image=image_input, strength=image_strength, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", ).images[0] return final_image @spaces.GPU(duration=100) def run_lora(prompt, image_input, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale, progress=gr.Progress(track_tqdm=True)): if selected_index is None: raise gr.Error("Selecione um modelo para continuar.") selected_lora = loras[selected_index] lora_path = selected_lora["repo"] trigger_word = selected_lora["trigger_word"] if(trigger_word): if "trigger_position" in selected_lora: if selected_lora["trigger_position"] == "prepend": prompt_mash = f"{trigger_word} {prompt}" else: prompt_mash = f"{prompt} {trigger_word}" else: prompt_mash = f"{trigger_word} {prompt}" else: prompt_mash = prompt with calculateDuration("Carregando Modelo"): pipe.unload_lora_weights() pipe_i2i.unload_lora_weights() #LoRA weights flow with calculateDuration(f"Carregando modelo para {selected_lora['title']}"): pipe_to_use = pipe_i2i if image_input is not None else pipe weight_name = selected_lora.get("Pesos", None) pipe_to_use.load_lora_weights( lora_path, weight_name=weight_name, low_cpu_mem_usage=True ) with calculateDuration("Gerando fontes"): if randomize_seed: seed = random.randint(0, MAX_SEED) if(image_input is not None): final_image = generate_image_to_image(prompt_mash, image_input, image_strength, steps, cfg_scale, width, height, lora_scale, seed) yield final_image, seed, gr.update(visible=False) else: image_generator = generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress) final_image = None step_counter = 0 for image in image_generator: step_counter+=1 final_image = image progress_bar = f'
"+trigger_word+"
como palavra-chave" if trigger_word else "Não encontramos a palavra-chave, se tiver, coloque-a no prompt."}