| import os |
| import subprocess |
| import sys |
|
|
| |
| def install(package): |
| subprocess.check_call([sys.executable, "-m", "pip", "install", package]) |
|
|
| |
| required_packages = [ |
| "streamlit", |
| "moviepy", |
| "diffusers", |
| "torch", |
| "Pillow", |
| "opencv-python-headless" |
| ] |
|
|
| |
| for package in required_packages: |
| try: |
| __import__(package) |
| except ImportError: |
| install(package) |
|
|
| |
| import streamlit as st |
| import cv2 |
| import numpy as np |
| from moviepy.editor import VideoFileClip, ImageSequenceClip |
| from diffusers import StableDiffusionInstructPix2PixPipeline |
| import torch |
| from PIL import Image, ImageOps |
| import math |
|
|
| |
| pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained("timbrooks/instruct-pix2pix", torch_dtype=torch.float16, safety_checker=None) |
|
|
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| if torch.cuda.is_available(): |
| pipe = pipe.to("cuda") |
|
|
|
|
| def pix2pix(input_image, instruction, steps, seed, text_cfg_scale, image_cfg_scale): |
| width, height = input_image.size |
| factor = 512 / max(width, height) |
| factor = math.ceil(min(width, height) * factor / 64) * 64 / min(width, height) |
| width = int((width * factor) // 64) * 64 |
| height = int((height * factor) // 64) * 64 |
| input_image = ImageOps.fit(input_image, (width, height), method=Image.Resampling.LANCZOS) |
|
|
| if instruction == "": |
| return input_image |
|
|
| generator = torch.manual_seed(seed) |
| edited_image = pipe( |
| instruction, image=input_image, |
| guidance_scale=text_cfg_scale, image_guidance_scale=image_cfg_scale, |
| num_inference_steps=steps, generator=generator, |
| ).images[0] |
| return edited_image |
|
|
|
|
| def get_frames(video_path): |
| frames = [] |
| clip = VideoFileClip(video_path) |
| |
| |
| if clip.fps > 30: |
| clip_resized = clip.resize(height=512) |
| clip_resized.write_videofile("video_resized.mp4", fps=30) |
| else: |
| clip_resized = clip.resize(height=512) |
| clip_resized.write_videofile("video_resized.mp4", fps=clip.fps) |
| |
| cap = cv2.VideoCapture("video_resized.mp4") |
| fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
| i = 0 |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| frame_path = f'frame_{i}.jpg' |
| cv2.imwrite(frame_path, frame) |
| frames.append(frame_path) |
| i += 1 |
|
|
| cap.release() |
| cv2.destroyAllWindows() |
| return frames, fps |
|
|
|
|
| def create_video(frames, fps): |
| clip = ImageSequenceClip(frames, fps=fps) |
| clip.write_videofile("output_video.mp4", fps=fps) |
| return "output_video.mp4" |
|
|
|
|
| def main(): |
| st.title("Pix2Pix Video") |
|
|
| video_file = st.file_uploader("Upload a video", type=["mp4", "mov", "avi", "mkv"]) |
| prompt = st.text_input("Enter your prompt") |
| seed = st.slider("Seed", 0, 2147483647, 123456) |
| trim_value = st.slider("Cut video at (s)", 1, 5, 1) |
|
|
| if st.button("Generate Pix2Pix video") and video_file: |
| with st.spinner("Processing video..."): |
| with open("input_video.mp4", "wb") as f: |
| f.write(video_file.getbuffer()) |
|
|
| frames, fps = get_frames("input_video.mp4") |
| n_frame = int(trim_value * fps) |
| n_frame = min(n_frame, len(frames)) |
|
|
| result_frames = [] |
| for i in frames[:n_frame]: |
| pil_image = Image.open(i).convert("RGB") |
| pix2pix_image = pix2pix(pil_image, prompt, 50, seed, 7.5, 1.5) |
| result_frame_path = f'result_{os.path.basename(i)}' |
| pix2pix_image.save(result_frame_path) |
| result_frames.append(result_frame_path) |
|
|
| final_video_path = create_video(result_frames, fps) |
|
|
| st.video(final_video_path) |
| st.success("Video generated successfully!") |
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|