Uniodonto / app.py
vcollos's picture
Update app.py
71fd629 verified
import os
import json
import copy
import time
import random
import logging
import numpy as np
from typing import Any, Dict, List, Optional, Union
import gradio as gr
import logging
import torch
from PIL import Image
import spaces
from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL, AutoPipelineForImage2Image
from diffusers.utils import load_image
from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download
import requests
import pandas as pd
from transformers.utils import move_cache
move_cache()
from diffusers import (
DiffusionPipeline,
AutoencoderTiny,
AutoencoderKL,
AutoPipelineForImage2Image,
FluxPipeline,
FlowMatchEulerDiscreteScheduler)
from huggingface_hub import (
hf_hub_download,
HfFileSystem,
ModelCard,
snapshot_download)
from diffusers.utils import load_image
from huggingface_hub import HfApi
token = os.getenv("HF_TOKEN")
#---if workspace = local or colab---
# Authenticate with Hugging Face
# from huggingface_hub import login
# Log in to Hugging Face using the provided token
# hf_token = 'hf-token-authentication'
# login(hf_token)
def calculate_shift(
image_seq_len,
base_seq_len: int = 256,
max_seq_len: int = 4096,
base_shift: float = 0.5,
max_shift: float = 1.16,
):
m = (max_shift - base_shift) / (max_seq_len - base_seq_len)
b = base_shift - m * base_seq_len
mu = image_seq_len * m + b
return mu
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
if timesteps is not None and sigmas is not None:
raise ValueError("Apenas um entre `timesteps` ou `sigmas` pode ser passado. Por favor, escolha um para definir valores personalizados")
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
elif sigmas is not None:
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
timesteps = scheduler.timesteps
num_inference_steps = len(timesteps)
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
timesteps = scheduler.timesteps
return timesteps, num_inference_steps
# FLUX pipeline
@torch.inference_mode()
def flux_pipe_call_that_returns_an_iterable_of_images(
self,
prompt: Union[str, List[str]] = None,
prompt_2: Optional[Union[str, List[str]]] = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 28,
timesteps: List[int] = None,
guidance_scale: float = 3.5,
num_images_per_prompt: Optional[int] = 1,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
prompt_embeds: Optional[torch.FloatTensor] = None,
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
joint_attention_kwargs: Optional[Dict[str, Any]] = None,
max_sequence_length: int = 512,
good_vae: Optional[Any] = None,
):
height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor
self.check_inputs(
prompt,
prompt_2,
height,
width,
prompt_embeds=prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
max_sequence_length=max_sequence_length,
)
self._guidance_scale = guidance_scale
self._joint_attention_kwargs = joint_attention_kwargs
self._interrupt = False
batch_size = 1 if isinstance(prompt, str) else len(prompt)
device = self._execution_device
lora_scale = joint_attention_kwargs.get("scale", None) if joint_attention_kwargs is not None else None
prompt_embeds, pooled_prompt_embeds, text_ids = self.encode_prompt(
prompt=prompt,
prompt_2=prompt_2,
prompt_embeds=prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
device=device,
num_images_per_prompt=num_images_per_prompt,
max_sequence_length=max_sequence_length,
lora_scale=lora_scale,
)
num_channels_latents = self.transformer.config.in_channels // 4
latents, latent_image_ids = self.prepare_latents(
batch_size * num_images_per_prompt,
num_channels_latents,
height,
width,
prompt_embeds.dtype,
device,
generator,
latents,
)
sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps)
image_seq_len = latents.shape[1]
mu = calculate_shift(
image_seq_len,
self.scheduler.config.base_image_seq_len,
self.scheduler.config.max_image_seq_len,
self.scheduler.config.base_shift,
self.scheduler.config.max_shift,
)
timesteps, num_inference_steps = retrieve_timesteps(
self.scheduler,
num_inference_steps,
device,
timesteps,
sigmas,
mu=mu,
)
self._num_timesteps = len(timesteps)
guidance = torch.full([1], guidance_scale, device=device, dtype=torch.float32).expand(latents.shape[0]) if self.transformer.config.guidance_embeds else None
for i, t in enumerate(timesteps):
if self.interrupt:
continue
timestep = t.expand(latents.shape[0]).to(latents.dtype)
noise_pred = self.transformer(
hidden_states=latents,
timestep=timestep / 1000,
guidance=guidance,
pooled_projections=pooled_prompt_embeds,
encoder_hidden_states=prompt_embeds,
txt_ids=text_ids,
img_ids=latent_image_ids,
joint_attention_kwargs=self.joint_attention_kwargs,
return_dict=False,
)[0]
latents_for_image = self._unpack_latents(latents, height, width, self.vae_scale_factor)
latents_for_image = (latents_for_image / self.vae.config.scaling_factor) + self.vae.config.shift_factor
image = self.vae.decode(latents_for_image, return_dict=False)[0]
yield self.image_processor.postprocess(image, output_type=output_type)[0]
latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0]
torch.cuda.empty_cache()
latents = self._unpack_latents(latents, height, width, self.vae_scale_factor)
latents = (latents / good_vae.config.scaling_factor) + good_vae.config.shift_factor
image = good_vae.decode(latents, return_dict=False)[0]
self.maybe_free_model_hooks()
torch.cuda.empty_cache()
yield self.image_processor.postprocess(image, output_type=output_type)[0]
#------------------------------------------------------------------------------------------------------------------------------------------------------------#
loras = [
#Super-Realism
{
"image": "https://huggingface.co/Collos/Jalves/resolve/main/images/jose.webp",
"title": "Jose Alves",
"repo": "Collos/Jalves",
"weights": "Jalves.safetensors",
"trigger_word": "José Alves"
},
{
"image": "https://huggingface.co/Collos/JulioCesar/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.33.50.jpeg",
"title": "Júlio César",
"repo": "Collos/JulioCesar",
"weights": "julio.safetensorss",
"trigger_word": "Júlio"
},
{
"image": "https://huggingface.co/Collos/PedroJr/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.34.01.jpeg",
"title": "Pedro Jr.",
"repo": "Collos/PedroJr",
"weights": "pedrojr.safetensors",
"trigger_word": "Pedro"
},
{
"image": "https://huggingface.co/Collos/JoseClovis/resolve/main/images/WhatsApp%20Image%202024-12-10%20at%2009.38.50.jpeg",
"title": "José Clóvis",
"repo": "Collos/JoseClovis",
"weights": "clovis.safetensors",
"trigger_word": "Clóvis"
}
#add new
]
# Initialize the base model
use_auth_token=True
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
base_model = "black-forest-labs/FLUX.1-dev"
taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device)
good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device)
pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device)
pipe_i2i = AutoPipelineForImage2Image.from_pretrained(
base_model,
vae=good_vae,
transformer=pipe.transformer,
text_encoder=pipe.text_encoder,
tokenizer=pipe.tokenizer,
text_encoder_2=pipe.text_encoder_2,
tokenizer_2=pipe.tokenizer_2,
torch_dtype=dtype
)
#TAEF1 is very tiny autoencoder which uses the same "latent API" as FLUX.1's VAE. FLUX.1 is useful for real-time previewing of the FLUX.1 generation process.#
taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device)
good_vae = AutoencoderKL.from_pretrained(base_model, subfolder="vae", torch_dtype=dtype).to(device)
pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype, vae=taef1).to(device)
pipe_i2i = AutoPipelineForImage2Image.from_pretrained(base_model,
vae=good_vae,
transformer=pipe.transformer,
text_encoder=pipe.text_encoder,
tokenizer=pipe.tokenizer,
text_encoder_2=pipe.text_encoder_2,
tokenizer_2=pipe.tokenizer_2,
torch_dtype=dtype
)
MAX_SEED = 2**32-1
pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe)
class calculateDuration:
def __init__(self, activity_name=""):
self.activity_name = activity_name
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_time = time.time()
self.elapsed_time = self.end_time - self.start_time
if self.activity_name:
print(f"tempo passado para {self.activity_name}: {self.elapsed_time:.6f} segundos")
else:
print(f"tempo passado: {self.elapsed_time:.6f} segundos")
def update_selection(evt: gr.SelectData, width, height):
selected_lora = loras[evt.index]
new_placeholder = f"Digite o prompt para {selected_lora['title']}, de preferência em inglês."
lora_repo = selected_lora["repo"]
updated_text = f"### Selecionado: [{lora_repo}](https://huggingface.co/{lora_repo}) ✅"
if "aspect" in selected_lora:
if selected_lora["aspect"] == "retrato":
width = 768
height = 1024
elif selected_lora["aspect"] == "paisagem":
width = 1024
height = 768
else:
width = 1024
height = 1024
return (
gr.update(placeholder=new_placeholder),
updated_text,
evt.index,
width,
height,
)
@spaces.GPU(duration=100)
def generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress):
pipe.to("cuda")
generator = torch.Generator(device="cuda").manual_seed(seed)
with calculateDuration("Generating image"):
# Generate image
for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images(
prompt=prompt_mash,
num_inference_steps=steps,
guidance_scale=cfg_scale,
width=width,
height=height,
generator=generator,
joint_attention_kwargs={"scale": lora_scale},
output_type="pil",
good_vae=good_vae,
):
yield img
def generate_image_to_image(prompt_mash, image_input_path, image_strength, steps, cfg_scale, width, height, lora_scale, seed):
generator = torch.Generator(device="cuda").manual_seed(seed)
pipe_i2i.to("cuda")
image_input = load_image(image_input_path)
final_image = pipe_i2i(
prompt=prompt_mash,
image=image_input,
strength=image_strength,
num_inference_steps=steps,
guidance_scale=cfg_scale,
width=width,
height=height,
generator=generator,
joint_attention_kwargs={"scale": lora_scale},
output_type="pil",
).images[0]
return final_image
@spaces.GPU(duration=100)
def run_lora(prompt, image_input, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale, progress=gr.Progress(track_tqdm=True)):
if selected_index is None:
raise gr.Error("Selecione um modelo para continuar.")
selected_lora = loras[selected_index]
lora_path = selected_lora["repo"]
trigger_word = selected_lora["trigger_word"]
if(trigger_word):
if "trigger_position" in selected_lora:
if selected_lora["trigger_position"] == "prepend":
prompt_mash = f"{trigger_word} {prompt}"
else:
prompt_mash = f"{prompt} {trigger_word}"
else:
prompt_mash = f"{trigger_word} {prompt}"
else:
prompt_mash = prompt
with calculateDuration("Carregando Modelo"):
pipe.unload_lora_weights()
pipe_i2i.unload_lora_weights()
#LoRA weights flow
with calculateDuration(f"Carregando modelo para {selected_lora['title']}"):
pipe_to_use = pipe_i2i if image_input is not None else pipe
weight_name = selected_lora.get("Pesos", None)
pipe_to_use.load_lora_weights(
lora_path,
weight_name=weight_name,
low_cpu_mem_usage=True
)
with calculateDuration("Gerando fontes"):
if randomize_seed:
seed = random.randint(0, MAX_SEED)
if(image_input is not None):
final_image = generate_image_to_image(prompt_mash, image_input, image_strength, steps, cfg_scale, width, height, lora_scale, seed)
yield final_image, seed, gr.update(visible=False)
else:
image_generator = generate_image(prompt_mash, steps, seed, cfg_scale, width, height, lora_scale, progress)
final_image = None
step_counter = 0
for image in image_generator:
step_counter+=1
final_image = image
progress_bar = f'<div class="progress-container"><div class="progress-bar" style="--current: {step_counter}; --total: {steps};"></div></div>'
yield image, seed, gr.update(value=progress_bar, visible=True)
yield final_image, seed, gr.update(value=progress_bar, visible=False)
def get_huggingface_safetensors(link):
split_link = link.split("/")
if(len(split_link) == 2):
model_card = ModelCard.load(link)
base_model = model_card.data.get("base_model")
print(base_model)
#Allows Both
if((base_model != "black-forest-labs/FLUX.1-dev") and (base_model != "black-forest-labs/FLUX.1-schnell")):
raise Exception("Flux LoRA Not Found!")
# Only allow "black-forest-labs/FLUX.1-dev"
#if base_model != "black-forest-labs/FLUX.1-dev":
#raise Exception("Only FLUX.1-dev is supported, other LoRA models are not allowed!")
image_path = model_card.data.get("widget", [{}])[0].get("output", {}).get("url", None)
trigger_word = model_card.data.get("instance_prompt", "")
image_url = f"https://huggingface.co/{link}/resolve/main/{image_path}" if image_path else None
fs = HfFileSystem()
try:
list_of_files = fs.ls(link, detail=False)
for file in list_of_files:
if(file.endswith(".safetensors")):
safetensors_name = file.split("/")[-1]
if (not image_url and file.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))):
image_elements = file.split("/")
image_url = f"https://huggingface.co/{link}/resolve/main/{image_elements[-1]}"
except Exception as e:
print(e)
gr.Warning(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA")
raise Exception(f"You didn't include a link neither a valid Hugging Face repository with a *.safetensors LoRA")
return split_link[1], link, safetensors_name, trigger_word, image_url
def check_custom_model(link):
if(link.startswith("https://")):
if(link.startswith("https://huggingface.co") or link.startswith("https://www.huggingface.co")):
link_split = link.split("huggingface.co/")
return get_huggingface_safetensors(link_split[1])
else:
return get_huggingface_safetensors(link)
def add_custom_lora(custom_lora):
global loras
if custom_lora:
try:
title, repo, path, trigger_word, image = check_custom_model(custom_lora)
print(f"Modelo Externo: {repo}")
card = f'''
<div class="custom_lora_card">
<span>Loaded custom LoRA:</span>
<div class="card_internal">
<img src="{image}" />
<div>
<h3>{title}</h3>
<small>{"Usando: <code><b>"+trigger_word+"</code></b> como palavra-chave" if trigger_word else "Não encontramos a palavra-chave, se tiver, coloque-a no prompt."}<br></small>
</div>
</div>
</div>
'''
existing_item_index = next((index for (index, item) in enumerate(loras) if item['repo'] == repo), None)
if not existing_item_index:
new_item = {
"image": image,
"title": title,
"repo": repo,
"weights": path,
"trigger_word": trigger_word,
}
print(new_item)
existing_item_index = len(loras)
loras.append(new_item)
return (
gr.update(visible=True, value=card),
gr.update(visible=True),
gr.Gallery(selected_index=None),
f"Custom: {path}",
existing_item_index,
trigger_word,
)
except Exception as e:
gr.Warning(
f"Modelo Inválido: ou o link está errado ou não é um FLUX"
)
return (
gr.update(visible=True, value=f"Modelo Inválido: ou o link está errado ou não é um FLUX"),
gr.update(visible=False),
gr.update(),
"",
None,
"",
)
else:
return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, ""
def remove_custom_lora():
return gr.update(visible=False), gr.update(visible=False), gr.update(), "", None, ""
run_lora.zerogpu = True
collos = gr.themes.Soft(
primary_hue="gray",
secondary_hue="stone",
neutral_hue="slate",
radius_size=gr.themes.Size(lg="15px", md="8px", sm="6px", xl="16px", xs="4px", xxl="24px", xxs="2px")
).set(
body_background_fill='*primary_100',
embed_radius='*radius_lg',
shadow_drop='0 1px 2px rgba(0, 0, 0, 0.1)',
shadow_drop_lg='0 1px 2px rgba(0, 0, 0, 0.1)',
shadow_inset='0 1px 2px rgba(0, 0, 0, 0.1)',
shadow_spread='0 1px 2px rgba(0, 0, 0, 0.1)',
shadow_spread_dark='0 1px 2px rgba(0, 0, 0, 0.1)',
block_radius='*radius_lg',
block_shadow='*shadow_drop',
container_radius='*radius_lg'
)
collos.css = """
#group_with_padding {
padding: 20px;
background-color: #f5f5f5;
border: 1px solid #ccc;
}
#padded_text {
padding: 10px;
background-color: #eef;
border-radius: 5px;
font-size: 16px;
}
"""
with gr.Blocks(theme=collos, delete_cache=(60, 60)) as app:
title = gr.HTML(
"""<img src="https://huggingface.co/spaces/vcollos/Uniodonto/resolve/main/logo/logo_collos_3.png" alt="Logo" style="display: block; margin: 0 auto; padding: 5px 0px 20px 0px; width: 200px;" />""",
elem_id="title",
)
selected_index = gr.State(None)
with gr.Row():
with gr.Column(scale=3):
prompt = gr.Textbox(label="Prompt", lines=1, placeholder=":/ Selecione o modelo ")
with gr.Column(scale=1):
generate_button = gr.Button("Gerar Imagem", variant="primary", elem_id="cta")
with gr.Row():
with gr.Column():
selected_info = gr.Markdown("")
gallery = gr.Gallery(
label="Galeria",
value=[(item["image"], item["title"]) for item in loras], # Argumento nomeado como 'value'
allow_preview=False,
columns=3,
show_share_button=False
)
with gr.Group():
custom_lora = gr.Textbox(label="Selecione um Modelo Externo", placeholder="black-forest-labs/FLUX.1-dev")
gr.Markdown("[Cheque a lista de modelos do Huggingface](https://huggingface.co/models?other=base_model:adapter:black-forest-labs/FLUX.1-dev)", elem_id="lora_list")
custom_lora_info = gr.HTML(visible=False)
custom_lora_button = gr.Button("Remova modelo Externo", visible=False)
with gr.Group():
with gr.Group():
gr.Text(value="Dica: Sempre digite a Palavra-chave referente ao modelo que são: José Alves, Júlio, Pedro e Clóvis, com os devidos acentos.", elem_id="padded_text")
with gr.Column():
progress_bar = gr.Markdown(elem_id="progress", visible=False)
result = gr.Image(label="Imagem Gerada")
with gr.Row():
with gr.Accordion("Configurações Avançadas", open=False):
with gr.Row():
input_image = gr.Image(label="Insira uma Imagem", type="filepath")
image_strength = gr.Slider(label="Remossão de ruído", info="Valores mais baixos significam maior influência da imagem.", minimum=0.1, maximum=1.0, step=0.01, value=0.75)
with gr.Column():
with gr.Row():
cfg_scale = gr.Slider(label="Aumentar Escala", minimum=1, maximum=20, step=0.5, value=3.5)
steps = gr.Slider(label="Passos", minimum=1, maximum=50, step=1, value=28)
with gr.Row():
width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=64, value=1024)
height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=64, value=1024)
with gr.Row():
randomize_seed = gr.Checkbox(True, label="Fonte Randomizada")
seed = gr.Slider(label="Fontes", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True)
lora_scale = gr.Slider(label="Escala do Modelo", minimum=0, maximum=3, step=0.01, value=0.95)
gallery.select(
update_selection,
inputs=[width, height],
outputs=[prompt, selected_info, selected_index, width, height]
)
custom_lora.input(
add_custom_lora,
inputs=[custom_lora],
outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, prompt]
)
custom_lora_button.click(
remove_custom_lora,
outputs=[custom_lora_info, custom_lora_button, gallery, selected_info, selected_index, custom_lora]
)
gr.on(
triggers=[generate_button.click, prompt.submit],
fn=run_lora,
inputs=[prompt, input_image, image_strength, cfg_scale, steps, selected_index, randomize_seed, seed, width, height, lora_scale],
outputs=[result, seed, progress_bar]
)
app.queue()
app.launch()