Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import random | |
import sys | |
from typing import Sequence, Mapping, Any, Union | |
import torch | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
import spaces | |
from comfy import model_management | |
from huggingface_hub import hf_hub_download | |
hf_hub_download( | |
repo_id="Madespace/clip", | |
filename="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
local_dir="models/clip" | |
) | |
hf_hub_download( | |
repo_id="ezioruan/inswapper_128.onnx", | |
filename="inswapper_128.onnx", | |
local_dir="models/insightface" | |
) | |
hf_hub_download( | |
repo_id="gmk123/GFPGAN", | |
filename="GFPGANv1.4.pth", | |
local_dir="models/facerestore_models" | |
) | |
hf_hub_download( | |
repo_id="gemasai/4x_NMKD-Superscale-SP_178000_G", | |
filename="4x_NMKD-Superscale-SP_178000_G.pth", | |
local_dir="models/upscale_models" | |
) | |
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
"""Returns the value at the given index of a sequence or mapping. | |
If the object is a sequence (like list or string), returns the value at the given index. | |
If the object is a mapping (like a dictionary), returns the value at the index-th key. | |
Some return a dictionary, in these cases, we look for the "results" key | |
Args: | |
obj (Union[Sequence, Mapping]): The object to retrieve the value from. | |
index (int): The index of the value to retrieve. | |
Returns: | |
Any: The value at the given index. | |
Raises: | |
IndexError: If the index is out of bounds for the object and the object is not a mapping. | |
""" | |
try: | |
return obj[index] | |
except KeyError: | |
return obj["result"][index] | |
def find_path(name: str, path: str = None) -> str: | |
""" | |
Recursively looks at parent folders starting from the given path until it finds the given name. | |
Returns the path as a Path object if found, or None otherwise. | |
""" | |
# If no path is given, use the current working directory | |
if path is None: | |
path = os.getcwd() | |
# Check if the current directory contains the name | |
if name in os.listdir(path): | |
path_name = os.path.join(path, name) | |
print(f"{name} found: {path_name}") | |
return path_name | |
# Get the parent directory | |
parent_directory = os.path.dirname(path) | |
# If the parent directory is the same as the current directory, we've reached the root and stop the search | |
if parent_directory == path: | |
return None | |
# Recursively call the function with the parent directory | |
return find_path(name, parent_directory) | |
def add_comfyui_directory_to_sys_path() -> None: | |
""" | |
Add 'ComfyUI' to the sys.path | |
""" | |
comfyui_path = find_path("ComfyUI") | |
if comfyui_path is not None and os.path.isdir(comfyui_path): | |
sys.path.append(comfyui_path) | |
print(f"'{comfyui_path}' added to sys.path") | |
def add_extra_model_paths() -> None: | |
""" | |
Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. | |
""" | |
try: | |
from main import load_extra_path_config | |
except ImportError: | |
print( | |
"Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." | |
) | |
from ut.extra_config import load_extra_path_config | |
extra_model_paths = find_path("extra_model_paths.yaml") | |
if extra_model_paths is not None: | |
load_extra_path_config(extra_model_paths) | |
else: | |
print("Could not find the extra_model_paths config file.") | |
add_comfyui_directory_to_sys_path() | |
add_extra_model_paths() | |
def import_custom_nodes() -> None: | |
"""Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS | |
This function sets up a new asyncio event loop, initializes the PromptServer, | |
creates a PromptQueue, and initializes the custom nodes. | |
""" | |
import asyncio | |
import execution | |
from nodes import init_extra_nodes | |
import server | |
# Creating a new event loop and setting it as the default loop | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
# Creating an instance of PromptServer with the loop | |
server_instance = server.PromptServer(loop) | |
execution.PromptQueue(server_instance) | |
# Initializing custom nodes | |
init_extra_nodes() | |
from nodes import NODE_CLASS_MAPPINGS | |
#TO be added to "model_loaders" as it loads a model | |
# downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ | |
# "DownloadAndLoadCogVideoModel" | |
# ]() | |
# downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( | |
# model="THUDM/CogVideoX-5b", | |
# precision="bf16", | |
# quantization="disabled", | |
# enable_sequential_cpu_offload=True, | |
# attention_mode="sdpa", | |
# load_device="main_device", | |
# ) | |
# loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() | |
# cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() | |
# cliploader_20 = cliploader.load_clip( | |
# clip_name="t5/google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
# type="sd3", | |
# device="default", | |
# ) | |
# emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() | |
# cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() | |
# cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() | |
# cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() | |
# reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() | |
# cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() | |
# vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
# #Add all the models that load a safetensors file | |
# model_loaders = [downloadandloadcogvideomodel_1, cliploader_20] | |
# # Check which models are valid and how to best load them | |
# valid_models = [ | |
# getattr(loader[0], 'patcher', loader[0]) | |
# for loader in model_loaders | |
# if not isinstance(loader[0], dict) and not isinstance(getattr(loader[0], 'patcher', None), dict) | |
# ] | |
# #Finally loads the models | |
# model_management.load_models_gpu(valid_models) | |
#Run ComfyUI Workflow | |
def generate_video(positive_prompt, num_frames, input_image): | |
print("Positive Prompt:", positive_prompt) | |
print("Number of Frames:", num_frames) | |
print("Input Image:", input_image) | |
progress = gr.Progress(track_tqdm=True) | |
import_custom_nodes() | |
with torch.inference_mode(): | |
downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ | |
"DownloadAndLoadCogVideoModel" | |
]() | |
downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( | |
model="THUDM/CogVideoX-5b", | |
precision="bf16", | |
quantization="disabled", | |
enable_sequential_cpu_offload=True, | |
attention_mode="sdpa", | |
load_device="main_device", | |
) | |
loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() | |
loadimage_8 = loadimage.load_image(image=input_image) | |
cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() | |
cliploader_20 = cliploader.load_clip( | |
clip_name="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
type="sd3", | |
device="default", | |
) | |
emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() | |
emptylatentimage_161 = emptylatentimage.generate( | |
width=480, #reduce this to avoid OOM error | |
height=480, #reduce this to avoid OOM error | |
batch_size=1 #reduce this to avoid OOM error | |
) | |
cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() | |
cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() | |
cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() | |
reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() | |
cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() | |
vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
for q in range(1): | |
cogvideotextencode_30 = cogvideotextencode.process( | |
prompt=positive_prompt, | |
strength=1, | |
force_offload=True, | |
clip=get_value_at_index(cliploader_20, 0), | |
) | |
cogvideotextencode_31 = cogvideotextencode.process( | |
prompt='', | |
strength=1, | |
force_offload=True, | |
clip=get_value_at_index(cogvideotextencode_30, 1), | |
) | |
cogvideosampler_155 = cogvideosampler.process( | |
num_frames=num_frames, | |
steps=30, #reduce this to avoid OOM error | |
cfg=6, | |
seed=random.randint(1, 2**64), | |
scheduler="CogVideoXDDIM", | |
denoise_strength=1, | |
model=get_value_at_index(downloadandloadcogvideomodel_1, 0), | |
positive=get_value_at_index(cogvideotextencode_30, 0), | |
negative=get_value_at_index(cogvideotextencode_31, 0), | |
samples=get_value_at_index(emptylatentimage_161, 0), | |
) | |
cogvideodecode_11 = cogvideodecode.decode( | |
enable_vae_tiling=False, | |
tile_sample_min_height=240,#reduce this to avoid OOM error | |
tile_sample_min_width=240,#reduce this to avoid OOM error | |
tile_overlap_factor_height=0.2, | |
tile_overlap_factor_width=0.2, | |
auto_tile_size=True, | |
vae=get_value_at_index(downloadandloadcogvideomodel_1, 1), | |
samples=get_value_at_index(cogvideosampler_155, 0), | |
) | |
reactorfaceswap_3 = reactorfaceswap.execute( | |
enabled=True, | |
swap_model="inswapper_128.onnx", | |
facedetection="retinaface_resnet50", | |
face_restore_model="GFPGANv1.4.pth", | |
face_restore_visibility=1, | |
codeformer_weight=0.75, | |
detect_gender_input="no", | |
detect_gender_source="no", | |
input_faces_index="0", | |
source_faces_index="0", | |
console_log_level=1, | |
input_image=get_value_at_index(cogvideodecode_11, 0), | |
source_image=get_value_at_index(loadimage_8, 0), | |
) | |
cr_upscale_image_151 = cr_upscale_image.upscale( | |
upscale_model="4x_NMKD-Superscale-SP_178000_G.pth", | |
mode="rescale", | |
rescale_factor=4, | |
resize_width=720, | |
resampling_method="lanczos", | |
supersample="true", | |
rounding_modulus=16, | |
image=get_value_at_index(reactorfaceswap_3, 0), | |
) | |
vhs_videocombine_154 = vhs_videocombine.combine_video( | |
frame_rate=8, | |
loop_count=0, | |
filename_prefix="AnimateDiff", | |
format="video/h264-mp4", | |
pix_fmt="yuv420p", | |
crf=19, | |
save_metadata=True, | |
trim_to_audio=False, | |
pingpong=True, | |
save_output=True, | |
images=get_value_at_index(cr_upscale_image_151, 0), | |
unique_id=7214086815220268849, | |
) | |
video_path = f"output/{vhs_videocombine_154['ui']['gifs'][0]['filename']}" | |
image_path = f"output/{vhs_videocombine_154['result'][0][1][0].split('/')[-1]}" | |
print(vhs_videocombine_154) | |
print(video_path, image_path) | |
return video_path, image_path | |
if __name__ == "__main__": | |
with gr.Blocks() as app: | |
with gr.Row(): | |
positive_prompt = gr.Textbox(label="Positive Prompt", value="A young Asian man with shoulder-length black hair, wearing a stylish black outfit, playing an acoustic guitar on a dimly lit stage. His full face is visible, showing a calm and focused expression as he strums the guitar. A microphone stand is positioned near him, and a music stand with sheet music is in front of him. The stage lighting casts a soft, warm glow on his face, and the background features an intimate live music setting with visible metal beams and soft blue ambient lighting. The scene captures the artistic mood of a live performance, emphasizing the details of the guitar, the musician’s fingers on the strings, and the relaxed yet passionate vibe of the moment.", lines=2) | |
with gr.Row(): | |
num_frames = gr.Number(label="Number of Frames", value=10) | |
with gr.Row(): | |
input_image = gr.Image(label="Input Image", type="filepath") | |
submit = gr.Button("Submit") | |
output_video = gr.Video(label="Output Video") | |
output_image = gr.Image(label="Output Image") | |
submit.click( | |
fn=generate_video, | |
inputs=[positive_prompt, num_frames, input_image], | |
outputs=[output_video, output_image] | |
) | |
app.launch(share=True) |