Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import sys | |
| from typing import Sequence, Mapping, Any, Union | |
| import torch | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| import spaces | |
| from comfy import model_management | |
| from huggingface_hub import hf_hub_download | |
| hf_hub_download( | |
| repo_id="Madespace/clip", | |
| filename="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
| local_dir="models/clip" | |
| ) | |
| hf_hub_download( | |
| repo_id="ezioruan/inswapper_128.onnx", | |
| filename="inswapper_128.onnx", | |
| local_dir="models/insightface" | |
| ) | |
| hf_hub_download( | |
| repo_id="gmk123/GFPGAN", | |
| filename="GFPGANv1.4.pth", | |
| local_dir="models/facerestore_models" | |
| ) | |
| hf_hub_download( | |
| repo_id="gemasai/4x_NMKD-Superscale-SP_178000_G", | |
| filename="4x_NMKD-Superscale-SP_178000_G.pth", | |
| local_dir="models/upscale_models" | |
| ) | |
| def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
| """Returns the value at the given index of a sequence or mapping. | |
| If the object is a sequence (like list or string), returns the value at the given index. | |
| If the object is a mapping (like a dictionary), returns the value at the index-th key. | |
| Some return a dictionary, in these cases, we look for the "results" key | |
| Args: | |
| obj (Union[Sequence, Mapping]): The object to retrieve the value from. | |
| index (int): The index of the value to retrieve. | |
| Returns: | |
| Any: The value at the given index. | |
| Raises: | |
| IndexError: If the index is out of bounds for the object and the object is not a mapping. | |
| """ | |
| try: | |
| return obj[index] | |
| except KeyError: | |
| return obj["result"][index] | |
| def find_path(name: str, path: str = None) -> str: | |
| """ | |
| Recursively looks at parent folders starting from the given path until it finds the given name. | |
| Returns the path as a Path object if found, or None otherwise. | |
| """ | |
| # If no path is given, use the current working directory | |
| if path is None: | |
| path = os.getcwd() | |
| # Check if the current directory contains the name | |
| if name in os.listdir(path): | |
| path_name = os.path.join(path, name) | |
| print(f"{name} found: {path_name}") | |
| return path_name | |
| # Get the parent directory | |
| parent_directory = os.path.dirname(path) | |
| # If the parent directory is the same as the current directory, we've reached the root and stop the search | |
| if parent_directory == path: | |
| return None | |
| # Recursively call the function with the parent directory | |
| return find_path(name, parent_directory) | |
| def add_comfyui_directory_to_sys_path() -> None: | |
| """ | |
| Add 'ComfyUI' to the sys.path | |
| """ | |
| comfyui_path = find_path("ComfyUI") | |
| if comfyui_path is not None and os.path.isdir(comfyui_path): | |
| sys.path.append(comfyui_path) | |
| print(f"'{comfyui_path}' added to sys.path") | |
| def add_extra_model_paths() -> None: | |
| """ | |
| Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. | |
| """ | |
| try: | |
| from main import load_extra_path_config | |
| except ImportError: | |
| print( | |
| "Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." | |
| ) | |
| from ut.extra_config import load_extra_path_config | |
| extra_model_paths = find_path("extra_model_paths.yaml") | |
| if extra_model_paths is not None: | |
| load_extra_path_config(extra_model_paths) | |
| else: | |
| print("Could not find the extra_model_paths config file.") | |
| add_comfyui_directory_to_sys_path() | |
| add_extra_model_paths() | |
| def import_custom_nodes() -> None: | |
| """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS | |
| This function sets up a new asyncio event loop, initializes the PromptServer, | |
| creates a PromptQueue, and initializes the custom nodes. | |
| """ | |
| import asyncio | |
| import execution | |
| from nodes import init_extra_nodes | |
| import server | |
| # Creating a new event loop and setting it as the default loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Creating an instance of PromptServer with the loop | |
| server_instance = server.PromptServer(loop) | |
| execution.PromptQueue(server_instance) | |
| # Initializing custom nodes | |
| init_extra_nodes() | |
| from nodes import NODE_CLASS_MAPPINGS | |
| #TO be added to "model_loaders" as it loads a model | |
| # downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ | |
| # "DownloadAndLoadCogVideoModel" | |
| # ]() | |
| # downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( | |
| # model="THUDM/CogVideoX-5b", | |
| # precision="bf16", | |
| # quantization="disabled", | |
| # enable_sequential_cpu_offload=True, | |
| # attention_mode="sdpa", | |
| # load_device="main_device", | |
| # ) | |
| # loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() | |
| # cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() | |
| # cliploader_20 = cliploader.load_clip( | |
| # clip_name="t5/google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
| # type="sd3", | |
| # device="default", | |
| # ) | |
| # emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() | |
| # cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() | |
| # cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() | |
| # cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() | |
| # reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() | |
| # cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() | |
| # vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
| # #Add all the models that load a safetensors file | |
| # model_loaders = [downloadandloadcogvideomodel_1, cliploader_20] | |
| # # Check which models are valid and how to best load them | |
| # valid_models = [ | |
| # getattr(loader[0], 'patcher', loader[0]) | |
| # for loader in model_loaders | |
| # if not isinstance(loader[0], dict) and not isinstance(getattr(loader[0], 'patcher', None), dict) | |
| # ] | |
| # #Finally loads the models | |
| # model_management.load_models_gpu(valid_models) | |
| #Run ComfyUI Workflow | |
| def generate_video(positive_prompt, num_frames, input_image): | |
| print("Positive Prompt:", positive_prompt) | |
| print("Number of Frames:", num_frames) | |
| print("Input Image:", input_image) | |
| progress = gr.Progress(track_tqdm=True) | |
| import_custom_nodes() | |
| with torch.inference_mode(): | |
| downloadandloadcogvideomodel = NODE_CLASS_MAPPINGS[ | |
| "DownloadAndLoadCogVideoModel" | |
| ]() | |
| downloadandloadcogvideomodel_1 = downloadandloadcogvideomodel.loadmodel( | |
| model="THUDM/CogVideoX-5b", | |
| precision="bf16", | |
| quantization="disabled", | |
| enable_sequential_cpu_offload=True, | |
| attention_mode="sdpa", | |
| load_device="main_device", | |
| ) | |
| loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() | |
| loadimage_8 = loadimage.load_image(image=input_image) | |
| cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() | |
| cliploader_20 = cliploader.load_clip( | |
| clip_name="google_t5-v1_1-xxl_encoderonly-fp8_e4m3fn.safetensors", | |
| type="sd3", | |
| device="default", | |
| ) | |
| emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() | |
| emptylatentimage_161 = emptylatentimage.generate( | |
| width=480, #reduce this to avoid OOM error | |
| height=480, #reduce this to avoid OOM error | |
| batch_size=1 #reduce this to avoid OOM error | |
| ) | |
| cogvideotextencode = NODE_CLASS_MAPPINGS["CogVideoTextEncode"]() | |
| cogvideosampler = NODE_CLASS_MAPPINGS["CogVideoSampler"]() | |
| cogvideodecode = NODE_CLASS_MAPPINGS["CogVideoDecode"]() | |
| reactorfaceswap = NODE_CLASS_MAPPINGS["ReActorFaceSwap"]() | |
| cr_upscale_image = NODE_CLASS_MAPPINGS["CR Upscale Image"]() | |
| vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
| for q in range(1): | |
| cogvideotextencode_30 = cogvideotextencode.process( | |
| prompt=positive_prompt, | |
| strength=1, | |
| force_offload=True, | |
| clip=get_value_at_index(cliploader_20, 0), | |
| ) | |
| cogvideotextencode_31 = cogvideotextencode.process( | |
| prompt='', | |
| strength=1, | |
| force_offload=True, | |
| clip=get_value_at_index(cogvideotextencode_30, 1), | |
| ) | |
| cogvideosampler_155 = cogvideosampler.process( | |
| num_frames=num_frames, | |
| steps=30, #reduce this to avoid OOM error | |
| cfg=6, | |
| seed=random.randint(1, 2**64), | |
| scheduler="CogVideoXDDIM", | |
| denoise_strength=1, | |
| model=get_value_at_index(downloadandloadcogvideomodel_1, 0), | |
| positive=get_value_at_index(cogvideotextencode_30, 0), | |
| negative=get_value_at_index(cogvideotextencode_31, 0), | |
| samples=get_value_at_index(emptylatentimage_161, 0), | |
| ) | |
| cogvideodecode_11 = cogvideodecode.decode( | |
| enable_vae_tiling=False, | |
| tile_sample_min_height=240,#reduce this to avoid OOM error | |
| tile_sample_min_width=240,#reduce this to avoid OOM error | |
| tile_overlap_factor_height=0.2, | |
| tile_overlap_factor_width=0.2, | |
| auto_tile_size=True, | |
| vae=get_value_at_index(downloadandloadcogvideomodel_1, 1), | |
| samples=get_value_at_index(cogvideosampler_155, 0), | |
| ) | |
| reactorfaceswap_3 = reactorfaceswap.execute( | |
| enabled=True, | |
| swap_model="inswapper_128.onnx", | |
| facedetection="retinaface_resnet50", | |
| face_restore_model="GFPGANv1.4.pth", | |
| face_restore_visibility=1, | |
| codeformer_weight=0.75, | |
| detect_gender_input="no", | |
| detect_gender_source="no", | |
| input_faces_index="0", | |
| source_faces_index="0", | |
| console_log_level=1, | |
| input_image=get_value_at_index(cogvideodecode_11, 0), | |
| source_image=get_value_at_index(loadimage_8, 0), | |
| ) | |
| cr_upscale_image_151 = cr_upscale_image.upscale( | |
| upscale_model="4x_NMKD-Superscale-SP_178000_G.pth", | |
| mode="rescale", | |
| rescale_factor=4, | |
| resize_width=720, | |
| resampling_method="lanczos", | |
| supersample="true", | |
| rounding_modulus=16, | |
| image=get_value_at_index(reactorfaceswap_3, 0), | |
| ) | |
| vhs_videocombine_154 = vhs_videocombine.combine_video( | |
| frame_rate=8, | |
| loop_count=0, | |
| filename_prefix="AnimateDiff", | |
| format="video/h264-mp4", | |
| pix_fmt="yuv420p", | |
| crf=19, | |
| save_metadata=True, | |
| trim_to_audio=False, | |
| pingpong=True, | |
| save_output=True, | |
| images=get_value_at_index(cr_upscale_image_151, 0), | |
| unique_id=7214086815220268849, | |
| ) | |
| video_path = f"output/{vhs_videocombine_154['ui']['gifs'][0]['filename']}" | |
| image_path = f"output/{vhs_videocombine_154['result'][0][1][0].split('/')[-1]}" | |
| print(vhs_videocombine_154) | |
| print(video_path, image_path) | |
| return video_path, image_path | |
| if __name__ == "__main__": | |
| with gr.Blocks() as app: | |
| with gr.Row(): | |
| positive_prompt = gr.Textbox(label="Positive Prompt", value="A young Asian man with shoulder-length black hair, wearing a stylish black outfit, playing an acoustic guitar on a dimly lit stage. His full face is visible, showing a calm and focused expression as he strums the guitar. A microphone stand is positioned near him, and a music stand with sheet music is in front of him. The stage lighting casts a soft, warm glow on his face, and the background features an intimate live music setting with visible metal beams and soft blue ambient lighting. The scene captures the artistic mood of a live performance, emphasizing the details of the guitar, the musician’s fingers on the strings, and the relaxed yet passionate vibe of the moment.", lines=2) | |
| with gr.Row(): | |
| num_frames = gr.Number(label="Number of Frames", value=10) | |
| with gr.Row(): | |
| input_image = gr.Image(label="Input Image", type="filepath") | |
| submit = gr.Button("Submit") | |
| output_video = gr.Video(label="Output Video") | |
| output_image = gr.Image(label="Output Image") | |
| submit.click( | |
| fn=generate_video, | |
| inputs=[positive_prompt, num_frames, input_image], | |
| outputs=[output_video, output_image] | |
| ) | |
| app.launch(share=True) |