Spaces:
Runtime error
Runtime error
import os | |
import random | |
import sys | |
from typing import Sequence, Mapping, Any, Union | |
import torch | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
import subprocess | |
import logging | |
import spaces | |
# Initialize logging | |
logging.basicConfig(level=logging.INFO) | |
# List of GitHub repositories | |
custom_nodes = [ | |
"https://github.com/rgthree/rgthree-comfy", | |
"https://github.com/yolain/ComfyUI-Easy-Use", | |
"https://github.com/Kosinkadink/ComfyUI-VideoHelperSuite", | |
"https://github.com/chrisgoringe/cg-use-everywhere", | |
"https://github.com/alt-key-project/comfyui-dream-project", | |
"https://github.com/giriss/comfy-image-saver", | |
"https://github.com/facok/ComfyUI-HunyuanVideoMultiLora" | |
] | |
custom_nodes_dir = "models/custom_nodes" | |
os.makedirs(custom_nodes_dir, exist_ok=True) | |
# Clone or update repositories | |
for repo in custom_nodes: | |
repo_name = repo.split("/")[-1] | |
repo_path = os.path.join(custom_nodes_dir, repo_name) | |
if os.path.exists(repo_path): | |
logging.info(f"π Updating {repo_name}...") | |
subprocess.run(["git", "-C", repo_path, "pull"], check=True) | |
else: | |
logging.info(f"β¬οΈ Cloning {repo_name}...") | |
subprocess.run(["git", "clone", repo, repo_path], check=True) | |
logging.info("β All custom nodes downloaded successfully!") | |
# Create a symlink if missing | |
if not os.path.exists("custom_nodes"): | |
os.symlink(custom_nodes_dir, "custom_nodes") | |
logging.info("β Symlink created: custom_nodes β models/custom_nodes") | |
os.makedirs("models/diffusion_models", exist_ok=True) | |
os.makedirs("models/text_encoders", exist_ok=True) | |
os.makedirs("models/vae", exist_ok=True) | |
os.makedirs("models/lora", exist_ok=True) | |
import shutil | |
from huggingface_hub import hf_hub_download | |
from pathlib import Path | |
# Function to download and move files to the correct directory | |
def download_and_move(repo_id, filename, local_dir): | |
# Download the file | |
file_path = hf_hub_download(repo_id=repo_id, filename=filename, local_dir=local_dir) | |
# Calculate the target path without the intermediate folders | |
src_path = Path(file_path) | |
dest_path = Path(local_dir) / Path(filename).name | |
# Move the file to the correct directory | |
shutil.move(src_path, dest_path) | |
print(f"Model moved to: {dest_path}") | |
# Download VAE model | |
download_and_move( | |
repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
filename="split_files/vae/hunyuan_video_vae_bf16.safetensors", | |
local_dir="models/vae" | |
) | |
# Download Diffusion Model | |
download_and_move( | |
repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
filename="split_files/diffusion_models/hunyuan_video_t2v_720p_bf16.safetensors", | |
local_dir="models/diffusion_models" | |
) | |
# Download Text Encoder - Clip | |
download_and_move( | |
repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
filename="split_files/text_encoders/clip_l.safetensors", | |
local_dir="models/text_encoders" | |
) | |
# Download Text Encoder - LLaVA | |
download_and_move( | |
repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
filename="split_files/text_encoders/llava_llama3_fp8_scaled.safetensors", | |
local_dir="models/text_encoders" | |
) | |
# Download LoRA model | |
download_and_move( | |
repo_id="alexShangeeth/huny_lora", | |
filename="boreal-hl-v1.safetensors", | |
local_dir="models/loras" | |
) | |
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
"""Returns the value at the given index of a sequence or mapping. | |
If the object is a sequence (like list or string), returns the value at the given index. | |
If the object is a mapping (like a dictionary), returns the value at the index-th key. | |
Some return a dictionary, in these cases, we look for the "results" key | |
Args: | |
obj (Union[Sequence, Mapping]): The object to retrieve the value from. | |
index (int): The index of the value to retrieve. | |
Returns: | |
Any: The value at the given index. | |
Raises: | |
IndexError: If the index is out of bounds for the object and the object is not a mapping. | |
""" | |
try: | |
return obj[index] | |
except KeyError: | |
return obj["result"][index] | |
def find_path(name: str, path: str = None) -> str: | |
""" | |
Recursively looks at parent folders starting from the given path until it finds the given name. | |
Returns the path as a Path object if found, or None otherwise. | |
""" | |
# If no path is given, use the current working directory | |
if path is None: | |
path = os.getcwd() | |
# Check if the current directory contains the name | |
if name in os.listdir(path): | |
path_name = os.path.join(path, name) | |
print(f"{name} found: {path_name}") | |
return path_name | |
# Get the parent directory | |
parent_directory = os.path.dirname(path) | |
# If the parent directory is the same as the current directory, we've reached the root and stop the search | |
if parent_directory == path: | |
return None | |
# Recursively call the function with the parent directory | |
return find_path(name, parent_directory) | |
def add_comfyui_directory_to_sys_path() -> None: | |
""" | |
Add 'ComfyUI' to the sys.path | |
""" | |
comfyui_path = find_path("ComfyUI") | |
if comfyui_path is not None and os.path.isdir(comfyui_path): | |
sys.path.append(comfyui_path) | |
print(f"'{comfyui_path}' added to sys.path") | |
def add_extra_model_paths() -> None: | |
""" | |
Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. | |
""" | |
try: | |
from main import load_extra_path_config | |
except ImportError: | |
print( | |
"Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." | |
) | |
from utils.extra_config import load_extra_path_config | |
extra_model_paths = find_path("extra_model_paths.yaml") | |
if extra_model_paths is not None: | |
load_extra_path_config(extra_model_paths) | |
else: | |
print("Could not find the extra_model_paths config file.") | |
add_comfyui_directory_to_sys_path() | |
add_extra_model_paths() | |
def import_custom_nodes() -> None: | |
"""Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS | |
This function sets up a new asyncio event loop, initializes the PromptServer, | |
creates a PromptQueue, and initializes the custom nodes. | |
""" | |
import asyncio | |
import execution | |
from nodes import init_extra_nodes | |
import server | |
# Creating a new event loop and setting it as the default loop | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
# Creating an instance of PromptServer with the loop | |
server_instance = server.PromptServer(loop) | |
execution.PromptQueue(server_instance) | |
# Initializing custom nodes | |
init_extra_nodes() | |
from nodes import NODE_CLASS_MAPPINGS | |
def generate_image(prompt,frames,lora_strenth,width,height,frame_rate): | |
import_custom_nodes() | |
with torch.inference_mode(): | |
vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]() | |
vaeloader_10 = vaeloader.load_vae(vae_name="hunyuan_video_vae_bf16.safetensors") | |
dualcliploader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]() | |
dualcliploader_11 = dualcliploader.load_clip( | |
clip_name1="clip_l.safetensors", | |
clip_name2="llava_llama3_fp8_scaled.safetensors", | |
type="hunyuan_video", | |
device="default", | |
) | |
unetloader = NODE_CLASS_MAPPINGS["UNETLoader"]() | |
unetloader_12 = unetloader.load_unet( | |
unet_name="hunyuan_video_t2v_720p_bf16.safetensors", | |
weight_dtype="fp8_e4m3fn", | |
) | |
ksamplerselect = NODE_CLASS_MAPPINGS["KSamplerSelect"]() | |
ksamplerselect_16 = ksamplerselect.get_sampler( | |
sampler_name="gradient_estimation" | |
) | |
randomnoise = NODE_CLASS_MAPPINGS["RandomNoise"]() | |
randomnoise_25 = randomnoise.get_noise(noise_seed=random.randint(1, 2**64)) | |
cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]() | |
cliptextencode_44 = cliptextencode.encode( | |
text=prompt, | |
clip=get_value_at_index(dualcliploader_11, 0), | |
) | |
int_literal = NODE_CLASS_MAPPINGS["Int Literal"]() | |
int_literal_295 = int_literal.get_int(int=frames) | |
hunyuanvideoloraloader = NODE_CLASS_MAPPINGS["HunyuanVideoLoraLoader"]() | |
modelsamplingsd3 = NODE_CLASS_MAPPINGS["ModelSamplingSD3"]() | |
fluxguidance = NODE_CLASS_MAPPINGS["FluxGuidance"]() | |
basicguider = NODE_CLASS_MAPPINGS["BasicGuider"]() | |
basicscheduler = NODE_CLASS_MAPPINGS["BasicScheduler"]() | |
emptyhunyuanlatentvideo = NODE_CLASS_MAPPINGS["EmptyHunyuanLatentVideo"]() | |
samplercustomadvanced = NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]() | |
big_latent_switch_dream = NODE_CLASS_MAPPINGS["Big Latent Switch [Dream]"]() | |
vaedecodetiled = NODE_CLASS_MAPPINGS["VAEDecodeTiled"]() | |
imagesharpen = NODE_CLASS_MAPPINGS["ImageSharpen"]() | |
vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
anything_everywhere3 = NODE_CLASS_MAPPINGS["Anything Everywhere3"]() | |
easy_cleangpuused = NODE_CLASS_MAPPINGS["easy cleanGpuUsed"]() | |
for q in range(1): | |
hunyuanvideoloraloader_255 = hunyuanvideoloraloader.load_lora( | |
lora_name="boreal-hl-v1.safetensors", | |
strength=lora_strenth, | |
blocks_type="all", | |
model=get_value_at_index(unetloader_12, 0), | |
) | |
modelsamplingsd3_67 = modelsamplingsd3.patch( | |
shift=9, model=get_value_at_index(hunyuanvideoloraloader_255, 0) | |
) | |
fluxguidance_26 = fluxguidance.append( | |
guidance=12, conditioning=get_value_at_index(cliptextencode_44, 0) | |
) | |
basicguider_22 = basicguider.get_guider( | |
model=get_value_at_index(modelsamplingsd3_67, 0), | |
conditioning=get_value_at_index(fluxguidance_26, 0), | |
) | |
basicscheduler_17 = basicscheduler.get_sigmas( | |
scheduler="simple", | |
steps=40, | |
denoise=1, | |
model=get_value_at_index(hunyuanvideoloraloader_255, 0), | |
) | |
emptyhunyuanlatentvideo_232 = emptyhunyuanlatentvideo.generate( | |
width=width, | |
height=height, | |
length=get_value_at_index(int_literal_295, 0), | |
batch_size=1, | |
) | |
samplercustomadvanced_13 = samplercustomadvanced.sample( | |
noise=get_value_at_index(randomnoise_25, 0), | |
guider=get_value_at_index(basicguider_22, 0), | |
sampler=get_value_at_index(ksamplerselect_16, 0), | |
sigmas=get_value_at_index(basicscheduler_17, 0), | |
latent_image=get_value_at_index(emptyhunyuanlatentvideo_232, 0), | |
) | |
big_latent_switch_dream_243 = big_latent_switch_dream.pick( | |
select=0, | |
on_missing="next", | |
input_2=get_value_at_index(samplercustomadvanced_13, 1), | |
) | |
vaedecodetiled_73 = vaedecodetiled.decode( | |
tile_size=128, | |
overlap=64, | |
temporal_size=64, | |
temporal_overlap=8, | |
samples=get_value_at_index(big_latent_switch_dream_243, 0), | |
vae=get_value_at_index(vaeloader_10, 0), | |
) | |
imagesharpen_106 = imagesharpen.sharpen( | |
sharpen_radius=1, | |
sigma=0.43, | |
alpha=0.5, | |
image=get_value_at_index(vaedecodetiled_73, 0), | |
) | |
vhs_videocombine_82 = vhs_videocombine.combine_video( | |
frame_rate=frame_rate, | |
loop_count=0, | |
filename_prefix="HunyuanVideo", | |
format="video/h264-mp4", | |
pix_fmt="yuv420p", | |
crf=10, | |
save_metadata=True, | |
trim_to_audio=False, | |
pingpong=False, | |
save_output=True, | |
images=get_value_at_index(imagesharpen_106, 0), | |
vae=get_value_at_index(vaeloader_10, 0), | |
unique_id=3348895206324303610, | |
) | |
anything_everywhere3_180 = anything_everywhere3.func( | |
CLIP=get_value_at_index(dualcliploader_11, 0), | |
VAE=get_value_at_index(vaeloader_10, 0), | |
) | |
easy_cleangpuused_182 = easy_cleangpuused.empty_cache( | |
anything=get_value_at_index(big_latent_switch_dream_243, 0), | |
unique_id=16583500820061639415, | |
) | |
#saved_path = f"output/{vhs_videocombine_82['ui']['filename_prefix'][0]}" | |
#return saved_path | |
def get_latest_video(directory="output"): | |
files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".mp4")] | |
if not files: | |
raise FileNotFoundError("No video files found in the output directory.") | |
latest_file = max(files, key=os.path.getmtime) # Get file with the latest modification time | |
return latest_file | |
# Get the latest video file based on modification time | |
saved_path = get_latest_video() | |
return saved_path | |
import gradio as gr | |
# Placeholder functions for demonstration purposes | |
def generate_video(prompt, frames, lora_strength, width, height, frame_rate): | |
# Integrate with your generation logic here | |
return "Generated Video Placeholder" | |
with gr.Blocks(theme="soft") as app: | |
gr.Markdown("# π FLUX Style Shaping π") | |
with gr.Row(): | |
with gr.Column(): | |
prompt_input = gr.Textbox(label="π Prompt", placeholder="Enter your prompt here...") | |
frames_input = gr.Number(label="π Frames", value=30, minimum=1) | |
lora_strength_input = gr.Slider(label="π§ LoRA Strength", minimum=0.0, maximum=1.0, value=0.5, step=0.01) | |
width_input = gr.Number(label="π Width", value=512, minimum=256, step=64) | |
height_input = gr.Number(label="π Height", value=512, minimum=256, step=64) | |
frame_rate_input = gr.Number(label="π― Frame Rate", value=24, minimum=1) | |
generate_btn = gr.Button("π Generate Video") | |
with gr.Column(): | |
output_video = gr.Video(label="π¬ Generated Video", interactive=False) | |
generate_btn.click( | |
fn=generate_image, | |
inputs=[prompt_input, frames_input, lora_strength_input, width_input, height_input, frame_rate_input], | |
outputs=[output_video] | |
) | |
app.launch(share=True) | |