import os import json import copy import time import random import logging import numpy as np from typing import Any, Dict, List, Optional, Union import gradio as gr import logging import torch from PIL import Image import spaces from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image from diffusers.utils import load_image from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download import requests import pandas as pd from datasets import load_dataset, Dataset from io import BytesIO import datetime from transformers.utils import move_cache move_cache() from diffusers import ( DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image, FluxPipeline, FlowMatchEulerDiscreteScheduler) from huggingface_hub import ( hf_hub_download, HfFileSystem, ModelCard, snapshot_download) from diffusers.utils import load_image from huggingface_hub import HfApi token = os.getenv("HF_TOKEN") # Nome do dataset no Hugging Face DATASET_NAME = "vcollos/Inferencias" def salvar_inferencia_no_dataset(prompt, image, seed, selected_lora): try: # Carregar o dataset do Hugging Face dataset = load_dataset(DATASET_NAME)['train'] except FileNotFoundError: # Criar dataset vazio caso ele não exista dataset = Dataset.from_dict({ "prompt": [], "image": [], "seed": [], "datetime": [], "lora_repo1": [] }) # Converter a imagem para bytes buffered = BytesIO() image.save(buffered, format="PNG") image_bytes = buffered.getvalue() # Adicionar o novo item como dicionário new_data = { "prompt": [prompt], "image": [image_bytes], "seed": [seed], "datetime": [datetime.datetime.now().isoformat()], "lora_repo1": [selected_lora["repo"]] } # Criar um novo dataset com os dados existentes e os novos dados new_dataset = Dataset.from_dict(new_data) updated_dataset = Dataset.from_dict({ "prompt": dataset["prompt"] + new_data["prompt"], "image": dataset["image"] + new_data["image"], "seed": dataset["seed"] + new_data["seed"], "datetime": dataset["datetime"] + new_data["datetime"], "lora_repo1": dataset["lora_repo1"] + new_data["lora_repo1"] }) # Subir o dataset atualizado para o Hugging Face Hub updated_dataset.push_to_hub(DATASET_NAME, commit_message="Adicionando nova inferência.") def calculate_shift( image_seq_len, base_seq_len: int = 256, max_seq_len: int = 4096, base_shift: float = 0.5, max_shift: float = 1.16, ): m = (max_shift - base_shift) / (max_seq_len - base_seq_len) b = base_shift - m * base_seq_len mu = image_seq_len * m + b return mu def retrieve_timesteps( scheduler, num_inference_steps: Optional[int] = None, device: Optional[Union[str, torch.device]] = None, timesteps: Optional[List[int]] = None, sigmas: Optional[List[float]] = None, **kwargs, ): if timesteps is not None and sigmas is not None: raise ValueError("Apenas um entre `timesteps` ou `sigmas` pode ser passado. Por favor, escolha um para definir valores personalizados") scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) elif sigmas is not None: scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) else: scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) timesteps = scheduler.timesteps return timesteps, num_inference_steps # FLUX pipeline @torch.inference_mode() def flux_pipe_call_that_returns_an_iterable_of_images( self, prompt: Union[str, List[str]] = None, prompt_2: Optional[Union[str, List[str]]] = None, height: Optional[int] = None, width: Optional[int] = None, num_inference_steps: int = 32, timesteps: List[int] = None, guidance_scale: float = 3.5, num_images_per_prompt: Optional[int] = 2, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.FloatTensor] = None, prompt_embeds: Optional[torch.FloatTensor] = None, pooled_prompt_embeds: Optional[torch.FloatTensor] = None, output_type: Optional[str] = "pil", return_dict: bool = True, joint_attention_kwargs: Optional[Dict[str, Any]] = None, max_sequence_length: int = 512, good_vae: Optional[Any] = None, ): height = height or self.default_sample_size * self.vae_scale_factor width = width or self.default_sample_size * self.vae_scale_factor self.check_inputs( prompt, prompt_2, height, width, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, max_sequence_length=max_sequence_length, ) self._guidance_scale = guidance_scale self._joint_attention_kwargs = joint_attention_kwargs self._interrupt = False batch_size = 1 if isinstance(prompt, str) else len(prompt) device = self._execution_device lora_scale = joint_attention_kwargs.get("scale", None) if joint_attention_kwargs is not None else None prompt_embeds, pooled_prompt_embeds, text_ids = self.encode_prompt( prompt=prompt, prompt_2=prompt_2, prompt_embeds=prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, device=device, num_images_per_prompt=num_images_per_prompt, max_sequence_length=max_sequence_length, lora_scale=lora_scale, ) num_channels_latents = self.transformer.config.in_channels // 4 latents, latent_image_ids = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, ) sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) image_seq_len = latents.shape[1] mu = calculate_shift( image_seq_len, self.scheduler.config.base_image_seq_len, self.scheduler.config.max_image_seq_len, self.scheduler.config.base_shift, self.scheduler.config.max_shift, ) timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas, mu=mu, ) self._num_timesteps = len(timesteps) guidance = torch.full([1], guidance_scale, device=device, dtype=torch.float32).expand(latents.shape[0]) if self.transformer.config.guidance_embeds else None for i, t in enumerate(timesteps): if self.interrupt: continue timestep = t.expand(latents.shape[0]).to(latents.dtype) noise_pred = self.transformer( hidden_states=latents, timestep=timestep / 1000, guidance=guidance, pooled_projections=pooled_prompt_embeds, encoder_hidden_states=prompt_embeds, txt_ids=text_ids, img_ids=latent_image_ids, joint_attention_kwargs=self.joint_attention_kwargs, return_dict=False, )[0] latents_for_image = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents_for_image = (latents_for_image / self.vae.config.scaling_factor) + self.vae.config.shift_factor image = self.vae.decode(latents_for_image, return_dict=False)[0] yield self.image_processor.postprocess(image, output_type=output_type)[0] latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0] torch.cuda.empty_cache() latents = self._unpack_latents(latents, height, width, self.vae_scale_factor) latents = (latents / good_vae.config.scaling_factor) + good_vae.config.shift_factor image = good_vae.decode(latents, return_dict=False)[0] self.maybe_free_model_hooks() torch.cuda.empty_cache() yield self.image_processor.postprocess(image, output_type=output_type)[0] #------------------------------------------------------------------------------------------------------------------------------------------------------------# loras = [ # Super-Realism { "image": "https://huggingface.co/vcollos/VitorCollos/resolve/main/images/IMG_0047.WEBP", "title": "Vitor", "repo": "vcollos/VitorCollos", "weights": "Vitor.safetensors", "trigger_word": "A photo of Vitor, RAW photo, (hyperrealistic portrait:1.3) of a [man/woman], (detailed eyes:1.2), (skin texture:1.4), (natural lighting:1.1), (soft shadows:1.1), (intricate hair details:1.3), (film grain:0.8), (8k:1.2), (depth of field:1.1), (sharp focus:1.1)," }, { "image": "https://huggingface.co/vcollos/camila/resolve/main/images/1732936378531__000003000_1.jpg", "title": "Camila", "repo": "vcollos/camila", "weights": "Camila.safetensorss", "trigger_word": "A photo of Camila" }, { "image": "https://huggingface.co/vcollos/PaulaP/resolve/main/images/image.webp", "title": "Paula", "repo": "vcollos/Paula2", "weights": "Paula P.safetensors", "trigger_word": "A photo of Paulinha" }, { "image": "https://huggingface.co/vcollos/vivi/resolve/main/images/1732990780958__000003000_3.jpg", "title": "Vivi", "repo": "vcollos/vivi", "weights": "Vivi.safetensors", "trigger_word": "A photo of Vivi" }, { "image": "https://huggingface.co/vcollos/caetano/resolve/main/images/02EFB2BC-0197-487C-AEDF-6DBF6EDE3531_1_105_c.jpeg", "title": "Caetano", "repo": "vcollos/caetano", "weights": "caetanocollos.safetensors", "trigger_word": "A photo of Caetano" }, { "image": "https://huggingface.co/vcollos/JoseManoel/resolve/main/co%CC%81pia%20de%20josemanoel.jpg", "title": "José Manoel", "repo": "vcollos/JoseManoel", "weights": "lora.safetensors", "trigger_word": "A photo of José Manoel" }, { "image": "https://huggingface.co/vcollos/Kardec/resolve/main/co%CC%81pia%20de%20kardec.jpg", "title": "Kardec", "repo": "vcollos/Kardec", "weights": "lora.safetensors", "trigger_word": "a photo of Kardec" }, { "image": "https://huggingface.co/vcollos/Ditinha/resolve/main/co%CC%81pia%20de%20ditinha.jpg", "title": "Ditinha", "repo": "vcollos/Ditinha", "weights": "lora.safetensors", "trigger_word": "A photo of Ditinha" } # add new ] # Initialize the base model use_auth_token=True dtype = torch.bfloat16 # Verifica se a GPU está disponível device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Usando dispositivo: {device}") base_model = "black-forest-labs/FLUX.1-dev" taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained( base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) #TAEF1 is very tiny autoencoder which uses the same "latent API" as FLUX.1's VAE. FLUX.1 is useful for real-time previewing of the FLUX.1 generation process.# taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device) pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device) pipe_i2i = AutoPipelineForImage2Image.from_pretrained(base_model, vae=good_vae, transformer=pipe.transformer, text_encoder=pipe.text_encoder, tokenizer=pipe.tokenizer, text_encoder_2=pipe.text_encoder_2, tokenizer_2=pipe.tokenizer_2, torch_dtype=dtype ) MAX_SEED = 2**32-1 pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) class calculateDuration: def __init__(self, activity_name=""): self.activity_name = activity_name def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_value, traceback): self.end_time = time.time() self.elapsed_time = self.end_time - self.start_time if self.activity_name: print(f"tempo passado para {self.activity_name}: {self.elapsed_time:.6f} segundos") else: print(f"tempo passado: {self.elapsed_time:.6f} segundos") def update_selection(evt: gr.SelectData, width, height): selected_lora = loras[evt.index] new_placeholder = f"Digite o prompt para {selected_lora['title']}, de preferência em inglês." lora_repo = selected_lora["repo"] updated_text = f"### Selecionado: [{lora_repo}](https://huggingface.co/{lora_repo}) ✅" if "aspect" in selected_lora: if selected_lora["aspect"] == "retrato": width = 768 height = 1024 elif selected_lora["aspect"] == "paisagem": width = 1024 height = 768 else: width = 1024 height = 1024 return ( gr.update(placeholder=new_placeholder), updated_text, evt.index, width, height, ) @spaces.GPU(duration=100) def generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress): pipe.to("cuda") generator = torch.Generator(device="cuda").manual_seed(seed) with calculateDuration("Generating image"): # Generate image for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( prompt=prompt_mash, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", good_vae=good_vae, ): yield img def generate_image_to_image(prompt_mash, image_input_path, image_strength, steps, cfg_scale, width, height, lora_scale, seed): generator = torch.Generator(device="cuda").manual_seed(seed) pipe_i2i.to("cuda") image_input = load_image(image_input_path) final_image = pipe_i2i( prompt=prompt_mash, image=image_input, strength=image_strength, num_inference_steps=steps, guidance_scale=cfg_scale, width=width, height=height, generator=generator, joint_attention_kwargs={"scale": lora_scale}, output_type="pil", ).images[0] return final_image @spaces.GPU(duration=100) def run_lora(prompt, image_input, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale, progress=gr.Progress(track_tqdm=True)): if selected_index is None: raise gr.Error("Selecione um modelo para continuar.") selected_lora = loras[selected_index] lora_path = selected_lora["repo"] trigger_word = selected_lora["trigger_word"] qualidade = "" if trigger_word: if "trigger_position" in selected_lora: if selected_lora["trigger_position"] == "prepend": prompt_mash = f"{trigger_word} {prompt} {qualidade}" else: prompt_mash = f"{prompt} {trigger_word} {qualidade}" else: prompt_mash = f"{trigger_word} {prompt} {qualidade}" else: prompt_mash = prompt with calculateDuration("Carregando Modelo"): pipe.unload_lora_weights() pipe_i2i.unload_lora_weights() # LoRA weights flow with calculateDuration(f"Carregando modelo para {selected_lora['title']}"): pipe_to_use = pipe_i2i if image_input is not None else pipe weight_name = selected_lora.get("Pesos", None) pipe_to_use.load_lora_weights( lora_path, weight_name=weight_name, low_cpu_mem_usage=True ) with calculateDuration("Gerando fontes"): if randomize_seed: seed = random.randint(0, MAX_SEED) if image_input is not None: final_image = generate_image_to_image(prompt_mash, image_input, image_strength, steps, cfg_scale, width, height, lora_scale, seed) yield final_image, seed, gr.update(visible=False) else: image_generator = generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress) final_image = None step_counter = 0 for image in image_generator: step_counter += 1 final_image = image progress_bar = f'
' yield image, seed, gr.update(value=progress_bar, visible=True) yield final_image, seed, gr.update(value=progress_bar, visible=False) # Salvar a imagem imediatamente após o loop salvar_inferencia_no_dataset(prompt, final_image, seed, selected_lora) yield final_image, seed, gr.update(value=progress_bar, visible=False) def get_huggingface_safetensors(link): split_link = link.split("/") if(len(split_link) == 2): model_card = ModelCard.load(link) base_model = model_card.data.get("base_model") print(base_model) #Allows Both if((base_model != "black-forest-labs/FLUX.1-dev") and (base_model != "black-forest-labs/FLUX.1-schnell")): raise Exception("Flux LoRA Not Found!") # Only allow "black-forest-labs/FLUX.1-dev" #if base_model != "black-forest-labs/FLUX.1-dev": #raise Exception("Only FLUX.1-dev is supported, other LoRA models are not allowed!") image_path = model_card.data.get("widget", [{}])[0].get("output", {}).get("url", None) trigger_word = model_card.data.get("instance_prompt", "") image_url = f"https://huggingface.co/{link}/resolve/main/{image_path}" if image_path else None fs = HfFileSystem() try: list_of_files = fs.ls(link, detail=False) for file in list_of_files: if(file.endswith(".safetensors")): safetensors_name = file.split("/")[-1] if (not image_url and file.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))): image_elements = file.split("/") image_url = f"https://huggingface.co/{link}/resolve/main/{image_elements[-1]}" except Exception as e: print(e) gr.Warning(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA") raise Exception(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA") return split_link[1], link, safetensors_name, trigger_word, image_url def check_custom_model(link): if(link.startswith("https://")): if(link.startswith("https://huggingface.co") or link.startswith("https://www.huggingface.co")): link_split = link.split("huggingface.co/") return get_huggingface_safetensors(link_split[1]) else: return get_huggingface_safetensors(link) def add_custom_lora(custom_lora): global loras if custom_lora: try: title, repo, path, trigger_word, image = check_custom_model(custom_lora) print(f"Modelo Externo: {repo}") card = f'''
Loaded custom LoRA:

{title}

{"Usando: "+trigger_word+" como palavra-chave" if trigger_word else "Não encontramos a palavra-chave, se tiver, coloque-a no prompt."}
''' existing_item_index = next((index for (index, item) in enumerate(loras) if item['repo'] == repo), None) if not existing_item_index: new_item = { "image": image, "title": title, "repo": repo, "weights": path, "trigger_word": trigger_word, } print(new_item) existing_item_index = len(loras) loras.append(new_item) return ( gr.update(visible=True, value=card), gr.update(visible=True), gr.Gallery(selected_index=None), f"Custom: {path}", existing_item_index, trigger_word, ) except Exception as e: gr.Warning( f"Modelo Inválido: ou o link está errado ou não é um FLUX" ) return ( gr.update(visible=True, value=f"Modelo Inválido: ou o link está errado ou não é um FLUX"), gr.update(visible=False), gr.update(), "", None, "", ) else: return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, "" def remove_custom_lora(): return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, "" run_lora.zerogpu = True collos = gr.themes.Soft( primary_hue="gray", secondary_hue="stone", neutral_hue="slate", radius_size=gr.themes.Size(lg="15px", md="8px", sm="6px", xl="16px", xs="4px", xxl="24px", xxs="2px") ).set( body_background_fill='*primary_100', embed_radius='*radius_lg', shadow_drop='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_drop_lg='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_inset='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_spread='0 1px 2px rgba(0, 0, 0, 0.1)', shadow_spread_dark='0 1px 2px rgba(0, 0, 0, 0.1)', block_radius='*radius_lg', block_shadow='*shadow_drop', container_radius='*radius_lg' ) collos.css = """ #group_with_padding { padding: 20px; background-color: #f5f5f5; border: 1px solid #ccc; } #padded_text { padding: 10px; background-color: #eef; border-radius: 5px; font-size: 16px; } """ with gr.Blocks(theme=collos, delete_cache=(60, 60)) as app: title = gr.HTML( """Logo""", elem_id="title", ) selected_index = gr.State(None) with gr.Row(): with gr.Column(scale=3): prompt = gr.Textbox(label="Prompt", lines=1, placeholder=":/ Selecione o modelo ") with gr.Column(scale=1): generate_button = gr.Button("Gerar Imagem", variant="primary", elem_id="cta") with gr.Row(): with gr.Column(): selected_info = gr.Markdown("") gallery = gr.Gallery( label="Galeria", value=[(item["image"], item["title"]) for item in loras], # Argumento nomeado como 'value' allow_preview=False, columns=3, show_share_button=False ) with gr.Group(): custom_lora = gr.Textbox(label="Selecione um Modelo Externo", placeholder="black-forest-labs/FLUX.1-dev") gr.Markdown("[Cheque a lista de modelos do Huggingface](https://huggingface.co/models?other=base_model:adapter:black-forest-labs/FLUX.1-dev)", elem_id="lora_list") custom_lora_info = gr.HTML(visible=False) custom_lora_button = gr.Button("Remova modelo Externo", visible=False) with gr.Column(): progress_bar = gr.Markdown(elem_id="progress", visible=False) result = gr.Image(label="Imagem Gerada") with gr.Row(): with gr.Accordion("Configurações Avançadas", open=False): with gr.Row(): input_image = gr.Image(label="Insira uma Imagem", type="filepath") image_strength = gr.Slider(label="Remossão de ruído", info="Valores mais baixos significam maior influência da imagem.", minimum=0.1, maximum=1.0, step=0.01, value=0.75) with gr.Column(): with gr.Row(): cfg_scale = gr.Slider(label="Aumentar Escala", minimum=1, maximum=20, step=0.5, value=3.0) steps = gr.Slider(label="Passos", minimum=1, maximum=50, step=1, value=32) with gr.Row(): width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=64, value=1024) height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=64, value=1024) with gr.Row(): randomize_seed = gr.Checkbox(True, label="Fonte Randomizada") seed = gr.Slider(label="Fontes", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True) lora_scale = gr.Slider(label="Escala do Modelo", minimum=0, maximum=3, step=0.01, value=1.20) gallery.select( update_selection, inputs=[width, height], outputs=[prompt, selected_info, selected_index, width, height] ) custom_lora.input( add_custom_lora, inputs=[custom_lora], outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, prompt] ) custom_lora_button.click( remove_custom_lora, outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, custom_lora] ) gr.on( triggers=[generate_button.click, prompt.submit], fn=run_lora, inputs=[prompt, input_image, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale], outputs=[result, seed, progress_bar] ) app.queue() app.launch()