import os import json import copy import time import random import logging import numpy as np from typing import Any, Dict, List, Optional, Union import gradio as gr import logging import torch from PIL import Image import spaces from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image from diffusers.utils import load_image from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download import requests import pandas as pd from transformers.utils import move_cache move_cache() from diffusers import ( DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image, FluxPipeline, FlowMatchEulerDiscreteScheduler) from huggingface_hub import ( hf_hub_download, HfFileSystem, ModelCard, snapshot_download) from diffusers.utils import load_image from huggingface_hub import HfApi token = os.getenv("HF_TOKEN") #---if workspace = local or colab--- # Authenticate with Hugging Face # from huggingface_hub import login # Log in to Hugging Face using the provided token # hf_token = 'hf-token-authentication' # login(hf_token) def calculate_shift( image_seq_len, base_seq_len: int = 256, max_seq_len: int = 4096, base_shift: float = 0.5, max_shift: float = 1.16, ): m = (max_shift - base_shift) / (max_seq_len - base_seq_len) b = base_shift - m * base_seq_len mu = image_seq_len * m + b return mu def retrieve_timesteps( scheduler, num_inference_steps: Optional[int] = None, device: Optional[Union[str, torch.device]] = None, timesteps: Optional[List[int]] = None, sigmas: Optional[List[float]] = None, **kwargs, ): if timesteps is not None and sigmas is not None: raise ValueError("Apenas um entre `timesteps` ou `sigmas` pode ser passado. Por favor, escolha um para definir valores personalizados") scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) elif sigmas is not None: scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) else: scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) timesteps = scheduler.timesteps return timesteps, num_inference_steps # FLUX pipeline @torch.inference_mode() def flux_pipe_call_that_returns_an_iterable_of_images( self, prompt: Union[str, List[str]] = None, prompt_2: Optional[Union[str, List[str]]] = None, height: Optional[int] = None, width: Optional[int] = None, num_inference_steps: int = 28, timesteps: List[int] = None, guidance_scale: float = 3.5, num_images_per_prompt: Optional[int] = 1, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.FloatTensor] = None, prompt_embeds: Optional[torch.FloatTensor] = None, pooled_prompt_embeds: Optional[torch.FloatTensor] = None, output_type: Optional[str] = "pil", return_dict: bool = True, joint_attention_kwargs: Optional[Dict[str, Any]] = None, max_sequence_length: int = 512, good_vae: Optional[Any] = None, ): height = height or self.default_sample_size * self.vae_scale_factor width = width or self.default_sample_size * self.vae_scale_factor self.check_inputs( prompt, prompt_2, height, width, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, max_sequence_length=max_sequence_length, ) self._guidance_scale = guidance_scale self._joint_attention_kwargs = joint_attention_kwargs self._interrupt = False batch_size = 1 if isinstance(prompt, str) else len(prompt) device = self._execution_device lora_scale = joint_attention_kwargs.get("scale", None) if joint_attention_kwargs is not None else None prompt_embeds, pooled_prompt_embeds, text_ids = self.encode_prompt( prompt=prompt, prompt_2=prompt_2, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, device=device, num_images_per_prompt=num_images_per_prompt, max_sequence_length=max_sequence_length, lora_scale=lora_scale, ) num_channels_latents = self.transformer.config.in_channels // 4 latents, latent_image_ids = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, ) sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) image_seq_len = latents.shape[1] mu = calculate_shift( image_seq_len, self.scheduler.config.base_image_seq_len, self.scheduler.config.max_image_seq_len, self.scheduler.config.base_shift, self.scheduler.config.max_shift, ) timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas, mu=mu, ) self._num_timesteps = len(timesteps) guidance = torch.full([1], guidance_scale, device=device, dtype=torch.float32).expand(latents.shape[0]) if self.transformer.config.guidance_embeds else None for i, t in enumerate(timesteps): if self.interrupt: continue timestep = t.expand(latents.shape[0]).to(latents.dtype) noise_pred = self.transformer( hidden_states=latents, timestep=timestep / 1000, guidance=guidance, pooled_projections=pooled_prompt_embeds, encoder_hidden_states=prompt_embeds, txt_ids=text_ids, img_ids=latent_image_ids, joint_attention_kwargs=self.joint_attention_kwargs, return_dict=False, )[0] latents_for_image = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents_for_image = (latents_for_image / self.vae.config.scaling_factor) + self.vae.config.shift_factor image = self.vae.decode(latents_for_image, return_dict=False)[0] yield self.image_processor.postprocess(image, output_type=output_type)[0] latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0] torch.cuda.empty_cache() latents = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents = (latents / good_vae.config.scaling_factor) + good_vae.config.shift_factor image = good_vae.decode(latents, return_dict=False)[0] self.maybe_free_model_hooks() torch.cuda.empty_cache() yield self.image_processor.postprocess(image, output_type=output_type)[0] #------------------------------------------------------------------------------------------------------------------------------------------------------------# loras = [ #Super-Realism { "image": "https://huggingface.co/Collos/Jalves/resolve/main/images/jose.webp", "title": "Jose Alves", "repo": "Collos/Jalves", "weights": "Jalves.safetensors", "trigger_word": "José Alves" }, { "image": "https://huggingface.co/Collos/JulioCesar/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.33.50.jpeg", "title": "Júlio César", "repo": "Collos/JulioCesar", "weights": "julio.safetensorss", "trigger_word": "Júlio" }, { "image": "https://huggingface.co/Collos/PedroJr/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.34.01.jpeg", "title": "Pedro Jr.", "repo": "Collos/PedroJr", "weights": "pedrojr.safetensors", "trigger_word": "Pedro" }, { "image": "https://huggingface.co/Collos/JoseClovis/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.38.50.jpeg", "title": "José Clóvis", "repo": "Collos/JoseClovis", "weights": "clovis.safetensors", "trigger_word": "Clóvis" } #add new ] # Initialize the base model use_auth_token=True dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" base_model = "black-forest-labs/FLUX.1-dev" taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained( base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) #TAEF1 is very tiny autoencoder which uses the same "latent API" as FLUX.1's VAE. FLUX.1 is useful for real-time previewing of the FLUX.1 generation process.# taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained(base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) MAX_SEED = 2**32-1 pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"tempo passado para {self.activity_name}: {self.elapsed_time:.6f} segundos") else: print(f"tempo passado: {self.elapsed_time:.6f} segundos") def update_selection(evt: gr.SelectData, width, height): selected_lora = loras[evt.index] new_placeholder = f"Digite o prompt para {selected_lora['title']}" lora_repo = selected_lora["repo"] updated_text = f"### Selecionado: [{lora_repo}](https://huggingface.co/{lora_repo}) ✅" if "aspect" in selected_lora: if selected_lora["aspect"] == "retrato": width = 768 height = 1024 elif selected_lora["aspect"] == "paisagem": width = 1024 height = 768 else: width = 1024 height = 1024 return ( gr.update(placeholder=new_placeholder), updated_text, evt.index, width, height, ) @spaces.GPU(duration=100) def generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress): pipe.to("cuda") generator = torch.Generator(device="cuda").manual_seed(seed) with calculateDuration("Generating image"): # Generate image for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( prompt=prompt_mash, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", good_vae=good_vae, ): yield img def generate_image_to_image(prompt_mash, image_input_path, image_strength, steps, cfg_scale, width, height, lora_scale, seed): generator = torch.Generator(device="cuda").manual_seed(seed) pipe_i2i.to("cuda") image_input = load_image(image_input_path) final_image = pipe_i2i( prompt=prompt_mash, image=image_input, strength=image_strength, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", ).images[0] return final_image @spaces.GPU(duration=100) def run_lora(prompt, image_input, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale, progress=gr.Progress(track_tqdm=True)): if selected_index is None: raise gr.Error("Selecione um modelo para continuar.") selected_lora = loras[selected_index] lora_path = selected_lora["repo"] trigger_word = selected_lora["trigger_word"] if(trigger_word): if "trigger_position" in selected_lora: if selected_lora["trigger_position"] == "prepend": prompt_mash = f"{trigger_word} {prompt}" else: prompt_mash = f"{prompt} {trigger_word}" else: prompt_mash = f"{trigger_word} {prompt}" else: prompt_mash = prompt with calculateDuration("Carregando Modelo"): pipe.unload_lora_weights() pipe_i2i.unload_lora_weights() #LoRA weights flow with calculateDuration(f"Carregando modelo para {selected_lora['title']}"): pipe_to_use = pipe_i2i if image_input is not None else pipe weight_name = selected_lora.get("Pesos", None) pipe_to_use.load_lora_weights( lora_path, weight_name=weight_name, low_cpu_mem_usage=True ) with calculateDuration("Gerando fontes"): if randomize_seed: seed = random.randint(0, MAX_SEED) if(image_input is not None): final_image = generate_image_to_image(prompt_mash, image_input, image_strength, steps, cfg_scale, width, height, lora_scale, seed) yield final_image, seed, gr.update(visible=False) else: image_generator = generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress) final_image = None step_counter = 0 for image in image_generator: step_counter+=1 final_image = image progress_bar = f'
' yield image, seed, gr.update(value=progress_bar, visible=True) yield final_image, seed, gr.update(value=progress_bar, visible=False) def get_huggingface_safetensors(link): split_link = link.split("/") if(len(split_link) == 2): model_card = ModelCard.load(link) base_model = model_card.data.get("base_model") print(base_model) #Allows Both if((base_model != "black-forest-labs/FLUX.1-dev") and (base_model != "black-forest-labs/FLUX.1-schnell")): raise Exception("Flux LoRA Not Found!") # Only allow "black-forest-labs/FLUX.1-dev" #if base_model != "black-forest-labs/FLUX.1-dev": #raise Exception("Only FLUX.1-dev is supported, other LoRA models are not allowed!") image_path = model_card.data.get("widget", [{}])[0].get("output", {}).get("url", None) trigger_word = model_card.data.get("instance_prompt", "") image_url = f"https://huggingface.co/{link}/resolve/main/{image_path}" if image_path else None fs = HfFileSystem() try: list_of_files = fs.ls(link, detail=False) for file in list_of_files: if(file.endswith(".safetensors")): safetensors_name = file.split("/")[-1] if (not image_url and file.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))): image_elements = file.split("/") image_url = f"https://huggingface.co/{link}/resolve/main/{image_elements[-1]}" except Exception as e: print(e) gr.Warning(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA") raise Exception(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA") return split_link[1], link, safetensors_name, trigger_word, image_url def check_custom_model(link): if(link.startswith("https://")): if(link.startswith("https://huggingface.co") or link.startswith("https://www.huggingface.co")): link_split = link.split("huggingface.co/") return get_huggingface_safetensors(link_split[1]) else: return get_huggingface_safetensors(link) def add_custom_lora(custom_lora): global loras if(custom_lora): try: title, repo, path, trigger_word, image = check_custom_model(custom_lora) print(f"Modelo Externo: {repo}") card = f'''
Loaded custom LoRA:

{title}

{"Usando: "+trigger_word+" como palavra-chave" if trigger_word else "Não encontramos a palavra-chave, se tiver, coloque-a no prompt."}
''' existing_item_index = next((index for (index, item) in enumerate(loras) if item['repo'] == repo), None) if(not existing_item_index): new_item = { "image": image, "title": title, "repo": repo, "weights": path, "trigger_word": trigger_word } print(new_item) existing_item_index = len(loras) loras.append(new_item) return gr.update(visible=True, value=card), gr.update(visible=True), gr.Gallery(selected_index=None), f"Custom: {path}", existing_item_index, trigger_word except Exception as e: gr.Warning(f"Modelo Inválido: ou o link está errado ou não é um FLUX") return gr.update(visible=True, value=f"Modelo Inválido: ou o link está errado ou não é um FLUX"), gr.update(visible=False), gr.update(), "", None, "" else: return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, "" def remove_custom_lora(): return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, "" run_lora.zerogpu = True #import gradio as gr collos = gr.themes.Soft( primary_hue="gray", secondary_hue="stone", neutral_hue="slate", radius_size=gr.themes.Size(lg="15px", md="8px", sm="6px", xl="16px", xs="4px", xxl="24px", xxs="2px") ).set( body_background_fill='*primary_100', embed_radius='*radius_lg', shadow_drop='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_drop_lg='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_inset='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_spread='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_spread_dark='0 1px 2px rgba(0, 0, 0, 0.1)', block_radius='*radius_lg', block_shadow='*shadow_drop', container_radius='*radius_lg' ) with gr.Blocks(theme=collos, delete_cache=(60, 60)) as app: title = gr.HTML( """Logo""", elem_id="title", ) selected_index = gr.State(None) with gr.Row(): with gr.Column(scale=3): prompt = gr.Textbox(label="Prompt", lines=1, placeholder=":/ Selecione o modelo ") with gr.Column(scale=1): generate_button = gr.Button("Gerar Imagem", variant="primary", elem_id="cta") with gr.Row(): with gr.Column(): selected_info = gr.Markdown("") gallery = gr.Gallery( [(item["image"], item["title"]) for item in loras], allow_preview=False, columns=3, show_share_button=False ) with gr.Group(): custom_lora = gr.Textbox(label="Selecione um Modelo Externo", placeholder="black-forest-labs/FLUX.1-dev") gr.Markdown("[Cheque a lista de modelos do Huggingface](https://huggingface.co/models?other=base_model:adapter:black-forest-labs/FLUX.1-dev)", elem_id="lora_list") custom_lora_info = gr.HTML(visible=False) custom_lora_button = gr.Button("Remova modelo Externo", visible=False) with gr.Column(): progress_bar = gr.Markdown(elem_id="progress", visible=False) result = gr.Image(label="Imagem Gerada") with gr.Row(): with gr.Accordion("Configurações Avançadas", open=False): with gr.Row(): input_image = gr.Image(label="Insira uma Imagem", type="filepath") image_strength = gr.Slider(label="Remossão de ruído", info="Valores mais baixos significam maior influência da imagem.", minimum=0.1, maximum=1.0, step=0.01, value=0.75) with gr.Column(): with gr.Row(): cfg_scale = gr.Slider(label="Aumentar Escala", minimum=1, maximum=20, step=0.5, value=3.5) steps = gr.Slider(label="Passos", minimum=1, maximum=50, step=1, value=28) with gr.Row(): width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=64, value=1024) height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=64, value=1024) with gr.Row(): randomize_seed = gr.Checkbox(True, label="Fonte Randomizada") seed = gr.Slider(label="Fontes", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True) lora_scale = gr.Slider(label="Escala do Modelo", minimum=0, maximum=3, step=0.01, value=0.95) gallery.select( update_selection, inputs=[width, height], outputs=[prompt, selected_info, selected_index, width, height] ) custom_lora.input( add_custom_lora, inputs=[custom_lora], outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, prompt] ) custom_lora_button.click( remove_custom_lora, outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, custom_lora] ) gr.on( triggers=[generate_button.click, prompt.submit], fn=run_lora, inputs=[prompt, input_image, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale], outputs=[result, seed, progress_bar] ) app.queue() app.launch()