Spaces:
Paused
Paused
import os | |
import datetime | |
import shutil | |
import subprocess | |
import cv2 | |
from PIL import Image | |
from moviepy.editor import * | |
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip | |
from gradio_client import Client | |
import gradio as gr | |
from share_btn import community_icon_html, loading_icon_html, share_js | |
matte_client = Client("https://fffiloni-video-matting-anything.hf.space/") | |
# execute a CLI command | |
def execute_command(command: str) -> None: | |
subprocess.run(command, check=True) | |
def infer(video_frames, masks_frames, project_name): | |
# Create the directory if it doesn't exist | |
my_video_directory = f"{project_name}" | |
if not os.path.exists(my_video_directory): | |
os.makedirs(my_video_directory) | |
else: | |
# If the directory already exists, add a timestamp to the new directory name | |
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
my_video_directory = f"{project_name}_{timestamp}" | |
os.makedirs(my_video_directory) | |
# Assuming 'images' is a list of image file paths | |
for idx, image in enumerate(video_frames): | |
# Get the base file name (without path) from the original location | |
image_name = os.path.basename(image.name) | |
# Construct the destination path in the working directory | |
destination_path = os.path.join(my_video_directory, image_name) | |
# Copy the image from the original location to the working directory | |
shutil.copy(image.name, destination_path) | |
# Print the image name and its corresponding save path | |
print(f"Image {idx + 1}: {image_name} copied to {destination_path}") | |
# Create the directory if it doesn't exist | |
my_masks_directory = f"{project_name}_masks" | |
if not os.path.exists(my_masks_directory): | |
os.makedirs(my_masks_directory) | |
# Assuming 'images' is a list of image file paths | |
for idx, image in enumerate(masks_frames): | |
# Get the base file name (without path) from the original location | |
image_name = os.path.basename(image.name) | |
# Construct the destination path in the working directory | |
destination_path = os.path.join(my_masks_directory, image_name) | |
# Copy the image from the original location to the working directory | |
shutil.copy(image.name, destination_path) | |
# Print the image name and its corresponding save path | |
print(f"Image {idx + 1}: {image_name} copied to {destination_path}") | |
#video_frames_folder = "inputs/object_removal/bmx-trees" | |
#masks_folder = "inputs/object_removal/bmx-trees_mask" | |
video_frames_folder = f"{my_video_directory}" | |
masks_folder = f"{my_masks_directory}" | |
# Create the "results" folder if it doesn't exist | |
output_folder = "results" | |
if not os.path.exists(output_folder): | |
os.makedirs(output_folder) | |
#bmx_trees_folder = os.path.join(output_folder, "bmx-trees") | |
command = [ | |
f"python", | |
f"inference_propainter.py", | |
f"--video={video_frames_folder}", | |
f"--mask={masks_folder}", | |
f"--output={output_folder}" | |
] | |
execute_command(command) | |
# Get the list of files in the "results" folder | |
result_files = os.listdir(output_folder) | |
# Print the content of the "results" folder | |
print(f"Contents of the {output_folder} folder:") | |
for item in result_files: | |
print(item) | |
# List the content of the "bmx-trees" folder within "results" | |
results_folder = os.path.join(output_folder, f"{project_name}") | |
results_folder_content = [os.path.join(results_folder, item) for item in os.listdir(results_folder)] | |
print(f"Contents of the {results_folder} folder:") | |
for item in results_folder_content: | |
print(item) | |
return results_folder_content[0], results_folder_content[1], gr.Group.update(visible=True) | |
def get_frames(video_in, img_type): | |
frames = [] | |
#resize the video | |
clip = VideoFileClip(video_in) | |
#check fps | |
if clip.fps > 30: | |
print("vide rate is over 30, resetting to 30") | |
clip_resized = clip.resize(height=512) | |
clip_resized.write_videofile(f"{img_type}_video_resized.mp4", fps=30) | |
else: | |
print("video rate is OK") | |
clip_resized = clip.resize(height=512) | |
clip_resized.write_videofile(f"{img_type}_video_resized.mp4", fps=clip.fps) | |
print("video resized to 512 height") | |
# Opens the Video file with CV2 | |
cap= cv2.VideoCapture(f"{img_type}_video_resized.mp4") | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
print("video fps: " + str(fps)) | |
i=0 | |
while(cap.isOpened()): | |
ret, frame = cap.read() | |
if ret == False: | |
break | |
if img_type == "source": | |
filename = f'{i:05d}.jpg' | |
cv2.imwrite(filename, frame) | |
frames.append(filename) | |
elif img_type == "mask": | |
filename = f'{i:05d}.png' | |
cv2.imwrite(filename, frame) | |
frames.append(filename) | |
i+=1 | |
cap.release() | |
cv2.destroyAllWindows() | |
print("broke the video into frames") | |
return frames, fps | |
def get_matte(video_in, subject_to_remove): | |
print("Trying to call video matting") | |
result = matte_client.predict( | |
f"{video_in}", # str (filepath on your computer (or URL) of file) in 'parameter_4' Video component | |
10, # int | float (numeric value between 0 and 10) in 'Cut video at (s)' Slider component | |
f"{subject_to_remove}", # str in 'Text prompt' Textbox component | |
"", # str in 'Background prompt' Textbox component | |
api_name="/go_matte" | |
) | |
print(result) | |
return result[2] | |
def infer_auto(project_name, video_in, subject_to_remove): | |
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
print(video_in) | |
matte_video = get_matte(video_in, subject_to_remove) | |
# Cut the video to the first 3 seconds | |
#video_cut = f"video_cut.mp4" | |
#ffmpeg_extract_subclip(video_in, t1=0, t2=3, targetname=video_cut) | |
video_frames = get_frames(video_in, "source") | |
frames_list = video_frames[0] | |
print(video_frames[0]) | |
masks_frames = get_frames(matte_video, "mask") | |
masks_list = masks_frames[0] | |
print(masks_frames[0]) | |
# Check the lengths of the two lists | |
frames_length = len(frames_list) | |
masks_length = len(masks_list) | |
# Make the lists the same length if they are different | |
if frames_length > masks_length: | |
frames_list = frames_list[:masks_length] | |
elif masks_length > frames_length: | |
masks_list = masks_list[:frames_length] | |
# Now, both lists have the same length | |
# Create the directory if it doesn't exist | |
my_video_directory = f"{project_name}" | |
if not os.path.exists(my_video_directory): | |
os.makedirs(my_video_directory) | |
else: | |
# If the directory already exists, add a timestamp to the new directory name | |
my_video_directory = f"{project_name}_{timestamp}" | |
os.makedirs(my_video_directory) | |
print(f"Created the dir: {my_video_directory}") | |
# Assuming 'images' is a list of image file paths | |
for idx, image in enumerate(frames_list): | |
# Get the base file name (without path) from the original location | |
image_name = os.path.basename(image) | |
# Construct the destination path in the working directory | |
destination_path = os.path.join(my_video_directory, image_name) | |
# Copy the image from the original location to the working directory | |
shutil.copy(image, destination_path) | |
# Print the image name and its corresponding save path | |
print(f"Image {idx + 1}: {image_name} copied to {destination_path}") | |
# Create the directory if it doesn't exist | |
my_masks_directory = f"{project_name}_masks" | |
if not os.path.exists(my_masks_directory): | |
os.makedirs(my_masks_directory) | |
else: | |
# If the directory already exists, add a timestamp to the new directory name | |
my_masks_directory = f"{project_name}_masks_{timestamp}" | |
os.makedirs(my_masks_directory) | |
print(f"Created the dir: {my_masks_directory}") | |
# Assuming 'images' is a list of image file paths | |
for idx, image in enumerate(masks_list): | |
# Get the base file name (without path) from the original location | |
image_name = os.path.basename(image) | |
# Construct the destination path in the working directory | |
destination_path = os.path.join(my_masks_directory, image_name) | |
# Copy the image from the original location to the working directory | |
shutil.copy(image, destination_path) | |
# Print the image name and its corresponding save path | |
print(f"Image {idx + 1}: {image_name} copied to {destination_path}") | |
#video_frames_folder = "inputs/object_removal/bmx-trees" | |
#masks_folder = "inputs/object_removal/bmx-trees_mask" | |
video_frames_folder = f"{my_video_directory}" | |
masks_folder = f"{my_masks_directory}" | |
# Create the "results" folder if it doesn't exist | |
output_folder = f"results_{timestamp}" | |
if not os.path.exists(output_folder): | |
os.makedirs(output_folder) | |
#bmx_trees_folder = os.path.join(output_folder, "bmx-trees") | |
# Convert the float fps to an integer | |
needed_fps = int(video_frames[1]) | |
command_auto= [ | |
f"python", | |
f"inference_propainter.py", | |
f"--video={video_frames_folder}", | |
f"--mask={masks_folder}", | |
f"--output={output_folder}", | |
f"--save_fps={int(needed_fps)}", | |
#f"--fp16" | |
] | |
execute_command(command_auto) | |
# Get the list of files in the "results" folder | |
result_files = os.listdir(output_folder) | |
# Print the content of the "results" folder | |
print(f"Contents of the {output_folder} folder:") | |
for item in result_files: | |
print(item) | |
# List the content of the "bmx-trees" folder within "results" | |
results_folder = os.path.join(output_folder, my_video_directory) | |
results_folder_content = [os.path.join(results_folder, item) for item in os.listdir(results_folder)] | |
print(f"Contents of the {results_folder} folder:") | |
for item in results_folder_content: | |
print(item) | |
return results_folder_content[0], results_folder_content[1], gr.Group.update(visible=True) | |
css=""" | |
#col-container{ | |
margin: 0 auto; | |
max-width: 840px; | |
text-align: left; | |
} | |
.animate-spin { | |
animation: spin 1s linear infinite; | |
} | |
@keyframes spin { | |
from { | |
transform: rotate(0deg); | |
} | |
to { | |
transform: rotate(360deg); | |
} | |
} | |
#share-btn-container { | |
display: flex; | |
padding-left: 0.5rem !important; | |
padding-right: 0.5rem !important; | |
background-color: #000000; | |
justify-content: center; | |
align-items: center; | |
border-radius: 9999px !important; | |
max-width: 15rem; | |
height: 32px; | |
} | |
div#share-btn-container > div { | |
flex-direction: row; | |
background: black; | |
align-items: center; | |
} | |
#share-btn-container:hover { | |
background-color: #060606; | |
} | |
#share-btn { | |
all: initial; | |
color: #ffffff; | |
font-weight: 600; | |
font-size: 1em; | |
cursor:pointer; | |
font-family: 'IBM Plex Sans', sans-serif; | |
margin-left: 0.5rem !important; | |
padding-top: 0.5rem !important; | |
padding-bottom: 0.5rem !important; | |
right:0; | |
} | |
#share-btn * { | |
all: unset; | |
} | |
#share-btn-container div:nth-child(-n+2){ | |
width: auto !important; | |
min-height: 0px !important; | |
} | |
#share-btn-container .wrap { | |
display: none !important; | |
} | |
#share-btn-container.hidden { | |
display: none!important; | |
} | |
img[src*='#center'] { | |
display: block; | |
margin: unset; | |
margin-top: 6px; | |
} | |
div#component-25 { | |
align-items: center; | |
} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.HTML(""" | |
<h2 style="text-align: center;">ProPainter</h2> | |
<p style="text-align: center;"> | |
[ICCV 2023] ProPainter: Improving Propagation and Transformer for Video Inpainting <br /> | |
<a href="https://github.com/sczhou/ProPainter" target="_blank">code</a> | <a href="https://shangchenzhou.com/projects/ProPainter/" target="_blank">project page</a> | |
</p> | |
""") | |
with gr.Row(): | |
with gr.Tab("Manual"): | |
with gr.Column(): | |
project_name = gr.Textbox(label="Name your project", info="no spaces nor special characters", value="my-new-project") | |
video_frames = gr.File(label="Video frames", file_types=["image"], file_count="multiple") | |
masks_frames = gr.File(label="Masks frames", file_types=["image"], file_count="multiple") | |
submit_btn = gr.Button("Submit") | |
gr.Examples( | |
examples = [ | |
[ | |
[ | |
"hf-examples/manual/00000.jpg", | |
"hf-examples/manual/00001.jpg", | |
"hf-examples/manual/00002.jpg" | |
], | |
[ | |
"hf-examples/manual/00000.png", | |
"hf-examples/manual/00001.png", | |
"hf-examples/manual/00002.png" | |
], | |
"manual_example" | |
] | |
], | |
fn = infer, | |
inputs=[video_frames, masks_frames, project_name], | |
#outputs=[res_masked, res_files] | |
) | |
with gr.Tab("Auto"): | |
with gr.Column(): | |
project_name_2 = gr.Textbox(label="Name your project", info="no spaces nor special characters", value="my-new-project") | |
video_in = gr.Video(label="Source video", source="upload", format="mp4") | |
subject_to_remove = gr.Textbox(label="Subject to remove") | |
submit_auto_btn = gr.Button("Submit") | |
gr.Examples( | |
examples = [ | |
[ | |
"example_1", | |
"hf-examples/knight.mp4", | |
"knight" | |
], | |
[ | |
"example_2", | |
"hf-examples/knight.mp4", | |
"horse" | |
], | |
[ | |
"example_3", | |
"hf-examples/knight.mp4", | |
"tail" | |
] | |
], | |
fn = infer_auto, | |
inputs=[project_name_2, video_in, subject_to_remove], | |
#outputs=[res_masked, res_files] | |
) | |
with gr.Column(): | |
gr.Markdown(""" | |
### Results | |
""") | |
res_masked = gr.Video(label="Masked video", elem_id="res-masked") | |
res_files = gr.Video(label="Final result", elem_id="res_cleaned") | |
with gr.Row(): | |
with gr.Group(elem_id="share-btn-container", visible=False) as share_group: | |
community_icon = gr.HTML(community_icon_html) | |
loading_icon = gr.HTML(loading_icon_html) | |
share_button = gr.Button("Share with Community", elem_id="share-btn") | |
gr.Markdown(""" | |
[![Duplicate this Space](https://huggingface.co/datasets/huggingface/badges/raw/main/duplicate-this-space-lg.svg#center)](https://huggingface.co/spaces/fffiloni/ProPainter?duplicate=true) | |
""") | |
submit_btn.click(fn=infer, inputs=[video_frames, masks_frames, project_name], outputs=[res_masked, res_files, share_group]) | |
submit_auto_btn.click(fn=infer_auto, inputs=[project_name_2, video_in, subject_to_remove], outputs=[res_masked, res_files, share_group]) | |
share_button.click(None, [], [], _js=share_js) | |
demo.queue(max_size=12).launch() |