import os |
import platform |
import numpy as np |
from tqdm import trange |
import math |
import subprocess as sp |
import string |
import random |
from functools import reduce |
import re |
import modules.scripts as scripts |
import gradio as gr |
from datetime import datetime |
from modules import processing, shared, sd_samplers, images |
from modules.processing import Processed |
from modules.sd_samplers import samplers |
from modules.shared import opts, cmd_opts, state |
import subprocess |
wave_completed_regex = r'@wave_completed\(([\-]?[0-9]*\.?[0-9]+), ?([\-]?[0-9]*\.?[0-9]+)\)' |
wave_remaining_regex = r'@wave_remaining\(([\-]?[0-9]*\.?[0-9]+), ?([\-]?[0-9]*\.?[0-9]+)\)' |
def run_cmd(cmd): |
cmd = list(map(lambda arg: str(arg), cmd)) |
print("Executing %s" % " ".join(cmd)) |
popen_params = {"stdout": sp.DEVNULL, "stderr": sp.PIPE, "stdin": sp.DEVNULL} |
if os.name == "nt": |
popen_params["creationflags"] = 0x08000000 |
proc = sp.Popen(cmd, **popen_params) |
out, err = proc.communicate() |
proc.stderr.close() |
if proc.returncode: |
raise IOError(err.decode("utf8")) |
del proc |
def encode_video(input_pattern, starting_number, output_dir, fps, quality, encoding, create_segments, segment_duration, ffmpeg_path): |
two_pass = (encoding == "VP9 (webm)") |
alpha_channel = ("webm" in encoding) |
suffix = "webm" if "webm" in encoding else "mp4" |
output_location = output_dir + f".{suffix}" |
encoding_lib = { |
"VP9 (webm)": "libvpx-vp9", |
"VP8 (webm)": "libvpx", |
"H.264 (mp4)": "libx264", |
"H.265 (mp4)": "libx265", |
}[encoding] |
args = [ |
"-framerate", fps, |
"-start_number", int(starting_number), |
"-i", input_pattern, |
"-c:v", encoding_lib, |
"-b:v","0", |
"-crf", quality, |
] |
if encoding_lib == "libvpx-vp9": |
args += ["-pix_fmt", "yuva420p"] |
if(ffmpeg_path == ""): |
ffmpeg_path = "ffmpeg" |
if(platform.system == "Windows"): |
ffmpeg_path += ".exe" |
print("\n\n") |
if two_pass: |
first_pass_args = args + [ |
"-pass", "1", |
"-an", |
"-f", "null", |
os.devnull |
] |
second_pass_args = args + [ |
"-pass", "2", |
output_location |
] |
print("Running first pass ffmpeg encoding") |
run_cmd([ffmpeg_path] + first_pass_args) |
print("Running second pass ffmpeg encoding. This could take awhile...") |
run_cmd([ffmpeg_path] + second_pass_args) |
else: |
print("Running ffmpeg encoding. This could take awhile...") |
run_cmd([ffmpeg_path] + args + [output_location]) |
if(create_segments): |
print("Segmenting video") |
run_cmd([ffmpeg_path] + [ |
"-i", output_location, |
"-f", "segment", |
"-segment_time", segment_duration, |
"-vcodec", "copy", |
"-acodec", "copy", |
f"{output_dir}.%d.{suffix}" |
]) |
def set_weights(match_obj, wave_progress): |
weight_0 = 0 |
weight_1 = 0 |
if match_obj.group(1) is not None: |
weight_0 = float(match_obj.group(1)) |
if match_obj.group(2) is not None: |
weight_1 = float(match_obj.group(2)) |
max_weight = max(weight_0, weight_1) |
min_weight = min(weight_0, weight_1) |
weight_range = max_weight - min_weight |
weight = min_weight + weight_range * wave_progress |
return str(weight) |
class Script(scripts.Script): |
def title(self): |
return "Loopback Wave V1.4.1" |
def show(self, is_img2img): |
return is_img2img |
def ui(self, is_img2img): |
frames = gr.Slider(minimum=1, maximum=2048, step=1, label='Frames', value=100) |
frames_per_wave = gr.Slider(minimum=0, maximum=120, step=1, label='Frames Per Wave', value=20) |
denoising_strength_change_amplitude = gr.Slider(minimum=0, maximum=1, step=0.01, label='Max additional denoise', value=0.6) |
denoising_strength_change_offset = gr.Number(minimum=0, maximum=180, step=1, label='Wave offset (ignore this if you don\'t know what it means)', value=0) |
initial_image_number = gr.Number(minimum=0, label='Initial generated image number', value=0) |
save_prompts = gr.Checkbox(label='Save prompts as text file', value=True) |
prompts = gr.Textbox(label="Prompt Changes", lines=5, value="") |
save_video = gr.Checkbox(label='Save results as video', value=True) |
output_dir = gr.Textbox(label="Video Name", lines=1, value="") |
video_fps = gr.Slider(minimum=1, maximum=120, step=1, label='Frames per second', value=10) |
video_quality = gr.Slider(minimum=0, maximum=60, step=1, label='Video Quality (crf)', value=40) |
video_encoding = gr.Dropdown(label='Video encoding', value="VP9 (webm)", choices=["VP9 (webm)", "VP8 (webm)", "H.265 (mp4)", "H.264 (mp4)"]) |
ffmpeg_path = gr.Textbox(label="ffmpeg binary. Only set this if it fails otherwise.", lines=1, value="") |
segment_video = gr.Checkbox(label='Cut video in to segments', value=True) |
video_segment_duration = gr.Slider(minimum=10, maximum=60, step=1, label='Video Segment Duration (seconds)', value=20) |
return [frames, denoising_strength_change_amplitude, frames_per_wave, denoising_strength_change_offset,initial_image_number, prompts, save_prompts, save_video, output_dir, video_fps, video_quality, video_encoding, ffmpeg_path, segment_video, video_segment_duration] |
def run(self, p, frames, denoising_strength_change_amplitude, frames_per_wave, denoising_strength_change_offset, initial_image_number, prompts: str,save_prompts, save_video, output_dir, video_fps, video_quality, video_encoding, ffmpeg_path, segment_video, video_segment_duration): |
processing.fix_seed(p) |
batch_count = p.n_iter |
p.extra_generation_params = { |
"Max Additional Denoise": denoising_strength_change_amplitude, |
"Frames per wave": frames_per_wave, |
"Wave Offset": denoising_strength_change_offset, |
} |
p.do_not_save_samples = True |
changes_dict = {} |
p.batch_size = 1 |
p.n_iter = 1 |
output_images, info = None, None |
initial_seed = None |
initial_info = None |
grids = [] |
all_images = [] |
original_init_image = p.init_images |
state.job_count = frames * batch_count |
initial_color_corrections = [processing.setup_color_correction(p.init_images[0])] |
initial_denoising_strength = p.denoising_strength |
if(output_dir==""): |
output_dir = str(p.seed) |
else: |
output_dir = output_dir + "-" + str(p.seed) |
loopback_wave_path = os.path.join(p.outpath_samples, "loopback-wave") |
loopback_wave_images_path = os.path.join(loopback_wave_path, output_dir) |
os.makedirs(loopback_wave_images_path, exist_ok=True) |
p.outpath_samples = loopback_wave_images_path |
prompts = prompts.strip() |
if save_prompts: |
with open(loopback_wave_images_path + "-prompts.txt", "w") as f: |
generation_settings = [ |
"Generation Settings", |
f"Total Frames: {frames}", |
f"Frames Per Wave: {frames_per_wave}", |
f"Wave Offset: {denoising_strength_change_offset}", |
f"Base Denoising Strength: {initial_denoising_strength}", |
f"Max Additional Denoise: {denoising_strength_change_amplitude}", |
f"Initial Image Number: {initial_image_number}", |
"", |
"Video Encoding Settings", |
f"Save Video: {save_video}" |
] |
if save_video: |
generation_settings = generation_settings + [ |
f"Framerate: {video_fps}", |
f"Quality: {video_quality}", |
f"Encoding: {video_encoding}", |
f"Create Segmented Video: {segment_video}" |
] |
if segment_video: |
generation_settings = generation_settings + [f"Segment Duration: {video_segment_duration}"] |
generation_settings = generation_settings + [ |
"", |
"Prompt Details", |
"Initial Prompt:" + p.prompt, |
"", |
"Negative Prompt:" + p.negative_prompt, |
"", |
"Frame change prompts:", |
prompts |
] |
f.write('\n'.join(generation_settings)) |
if prompts: |
lines = prompts.split("\n") |
for prompt_line in lines: |
params = prompt_line.split("::") |
if len(params) == 2: |
changes_dict[params[0]] = { "prompt": params[1] } |
elif len(params) == 3: |
changes_dict[params[0]] = { "seed": params[1], "prompt": params[2] } |
else: |
raise IOError(f"Invalid input in prompt line: {prompt_line}") |
raw_prompt = p.prompt |
for n in range(batch_count): |
history = [] |
p.init_images = original_init_image |
seed_state = "adding" |
current_seed = p.seed |
for i in range(frames): |
current_seed = p.seed |
state.job = "" |
if str(i) in changes_dict: |
raw_prompt = changes_dict[str(i)]["prompt"] |
state.job = "New prompt: %s\n" % raw_prompt |
if "seed" in changes_dict[str(i)]: |
current_seed = changes_dict[str(i)]["seed"] |
if current_seed.startswith("+"): |
seed_state = "adding" |
current_seed = current_seed.strip("+") |
elif current_seed.startswith("-"): |
seed_state = "subtracting" |
current_seed = current_seed.strip("-") |
else: |
seed_state = "constant" |
current_seed = int(current_seed) |
p.seed = current_seed |
p.n_iter = 1 |
p.batch_size = 1 |
p.do_not_save_grid = True |
if opts.img2img_color_correction: |
p.color_corrections = initial_color_corrections |
wave_progress = float(1)/(float(frames_per_wave - 1))*float(((float(i)%float(frames_per_wave)) + ((float(1)/float(180))*denoising_strength_change_offset))) |
print(wave_progress) |
new_prompt = re.sub(wave_completed_regex, lambda x: set_weights(x, wave_progress), raw_prompt) |
new_prompt = re.sub(wave_remaining_regex, lambda x: set_weights(x, 1 - wave_progress), new_prompt) |
p.prompt = new_prompt |
print(new_prompt) |
denoising_strength_change_rate = 180/frames_per_wave |
cos = abs(math.cos(math.radians(i*denoising_strength_change_rate + denoising_strength_change_offset))) |
p.denoising_strength = initial_denoising_strength + denoising_strength_change_amplitude - (cos * denoising_strength_change_amplitude) |
state.job += f"Iteration {i + 1}/{frames}, batch {n + 1}/{batch_count}. Denoising Strength: {p.denoising_strength}" |
processed = processing.process_images(p) |
if initial_seed is None: |
initial_seed = processed.seed |
initial_info = processed.info |
init_img = processed.images[0] |
p.init_images = [init_img] |
if seed_state == "adding": |
p.seed = processed.seed + 1 |
elif seed_state == "subtracting": |
p.seed = processed.seed - 1 |
image_number = int(initial_image_number + i) |
images.save_image(init_img, p.outpath_samples, "", processed.seed, processed.prompt, forced_filename=str(image_number)) |
history.append(init_img) |
grid = images.image_grid(history, rows=1) |
if opts.grid_save: |
images.save_image(grid, p.outpath_grids, "grid", initial_seed, p.prompt, opts.grid_format, info=info, short_filename=not opts.grid_extended_filename, grid=True, p=p) |
grids.append(grid) |
all_images += history |
if opts.return_grid: |
all_images = grids + all_images |
if save_video: |
now = datetime.now() |
date_string = now.strftime("%Y-%m-%d") |
input_pattern = os.path.join(loopback_wave_images_path, date_string,"%d.png") |
encode_video(input_pattern, initial_image_number, loopback_wave_images_path, video_fps, video_quality, video_encoding, segment_video, video_segment_duration, ffmpeg_path) |
processed = Processed(p, all_images, initial_seed, initial_info) |
return processed |