StupidGame's picture
Upload 1941 files
baa8e90
raw
history blame contribute delete
No virus
12.9 kB
# -*- coding: utf-8 -*-
import os
import shutil
import subprocess
import tempfile
from functools import lru_cache
from PIL import Image as PilImage
from .categories import NodeCategories
from .err import on_error
from .shared import DreamConfig
#from .shared import MpegEncoderUtility
from .dreamtypes import *
CONFIG = DreamConfig()
@lru_cache(5)
def _load_image_cached(filename):
return PilImage.open(filename)
class TempFileSet:
def __init__(self):
self._files = dict()
def add(self, temppath, finalpath):
self._files[temppath] = finalpath
def remove(self):
for f in self._files.keys():
os.unlink(f)
def finalize(self):
for a, b in self._files.items():
shutil.move(a, b)
self._files = dict()
class AnimationSeqProcessor:
def __init__(self, sequence: AnimationSequence):
self._sequence = sequence
self._input_cache = {}
self._inputs = {}
self._output_dirs = {}
for b in self._sequence.batches:
self._inputs[b] = list(self._sequence.get_image_files_of_batch(b))
self._output_dirs[b] = os.path.dirname(os.path.abspath(self._inputs[b][0]))
self._ext = os.path.splitext(self._inputs[0][0])[1].lower()
self._length = len(self._inputs[0])
def _load_input(self, batch_id, index) -> DreamImage:
files = self._inputs[batch_id]
index = min(max(0, index), len(files) - 1)
filename = files[index]
return DreamImage(pil_image=_load_image_cached(filename))
def _process_single_batch(self, batch_id, indices, index_offsets: List[int], fun, output_dir) -> List[str]:
all_indices = list(indices)
last_index = max(all_indices)
workset = TempFileSet()
rnd = random.randint(0, 1000000)
result_files = list()
try:
for index in all_indices:
images = list(map(lambda offset: self._load_input(batch_id, index + offset), index_offsets))
result: Dict[int, DreamImage] = fun(index, last_index, images)
for (result_index, img) in result.items():
filepath = os.path.join(output_dir,
"tmp_" + str(rnd) + "_" + (str(result_index).zfill(8)) + self._ext)
filepath_final = os.path.join(output_dir, "seq_" + (str(result_index).zfill(8)) + self._ext)
if self._ext == ".png":
img.save_png(filepath)
else:
img.save_jpg(filepath, quality=CONFIG.get("encoding.jpeg_quality", 98))
workset.add(filepath, filepath_final)
result_files.append(filepath_final)
# all done with batch - remove input files
for oldfile in self._inputs[batch_id]:
os.unlink(oldfile)
workset.finalize()
return result_files
finally:
workset.remove()
def process(self, index_offsets: List[int], fun):
results = dict()
new_length = 0
for batch_id in self._sequence.batches:
resulting_filenames = self._process_single_batch(batch_id, range(len(self._inputs[batch_id])),
index_offsets, fun,
self._output_dirs[batch_id])
for (index, filename) in enumerate(resulting_filenames):
l = results.get(index, [])
l.append(filename)
results[index] = l
new_length = len(resulting_filenames)
new_fps = self._sequence.frame_counter.frames_per_second * (float(new_length) / self._length)
counter = FrameCounter(new_length - 1, new_length, new_fps)
return AnimationSequence(counter, results)
def _ffmpeg(config, filenames, fps, output):
fps = float(fps)
duration = 1.0 / fps
tmp = tempfile.NamedTemporaryFile(delete=False, mode="wb")
tempfilepath = tmp.name
try:
for filename in filenames:
filename = filename.replace("\\", "/")
tmp.write(f"file '{filename}'\n".encode())
tmp.write(f"duration {duration}\n".encode())
finally:
tmp.close()
try:
cmd = [config.get("ffmpeg.path", "ffmpeg")]
cmd.extend(config.get("ffmpeg.arguments"))
replacements = {"%FPS%": str(fps), "%FRAMES%": tempfilepath, "%OUTPUT%": output}
for (key, value) in replacements.items():
cmd = list(map(lambda s: s.replace(key, value), cmd))
subprocess.check_output(cmd, shell=True)
finally:
os.unlink(tempfilepath)
def _make_video_filename(name, file_ext):
(b, _) = os.path.splitext(name)
return b + "." + file_ext.strip(".")
#
# class DreamVideoEncoderMpegCoder:
# NODE_NAME = "Video Encoder (mpegCoder)"
# ICON = "🎬"
# CATEGORY = NodeCategories.ANIMATION_POSTPROCESSING
# RETURN_TYPES = (LogEntry.ID,)
# RETURN_NAMES = ("log_entry",)
# OUTPUT_NODE = True
# FUNCTION = "encode"
#
# @classmethod
# def INPUT_TYPES(cls):
# return {
# "required": SharedTypes.sequence | {
# "name": ("STRING", {"default": 'video', "multiline": False}),
# "framerate_factor": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 100.0}),
# "remove_images": ("BOOLEAN", {"default": True})
# },
# }
#
# def _find_free_filename(self, filename, defaultdir):
# if os.path.basename(filename) == filename:
# filename = os.path.join(defaultdir, filename)
# n = 1
# tested = filename
# while os.path.exists(tested):
# n += 1
# (b, ext) = os.path.splitext(filename)
# tested = b + "_" + str(n) + ext
# return tested
#
# def encode(self, sequence, name, framerate_factor, remove_images):
# if not sequence.is_defined:
# return (LogEntry([]),)
# config = DreamConfig()
# filename = _make_video_filename(name, config.get("mpeg_coder.file_extension", "mp4"))
# log_entry = LogEntry([])
# for batch_num in sequence.batches:
# try:
# images = list(sequence.get_image_files_of_batch(batch_num))
# filename = self._find_free_filename(filename, os.path.dirname(images[0]))
# first_image = DreamImage.from_file(images[0])
# enc = MpegEncoderUtility(video_path=filename,
# bit_rate_factor=float(config.get("mpeg_coder.bitrate_factor", 1.0)),
# encoding_threads=int(config.get("mpeg_coder.encoding_threads", 4)),
# max_b_frame=int(config.get("mpeg_coder.max_b_frame", 2)),
# width=first_image.width,
# height=first_image.height,
# files=images,
# fps=sequence.fps * framerate_factor,
# codec_name=config.get("mpeg_coder.codec_name", "libx265"))
# enc.encode()
# log_entry = log_entry.add("Generated video '{}'".format(filename))
# if remove_images:
# for imagepath in images:
# if os.path.isfile(imagepath):
# os.unlink(imagepath)
# except Exception as e:
# on_error(self.__class__, str(e))
# return (log_entry,)
#
class DreamVideoEncoder:
NODE_NAME = "FFMPEG Video Encoder"
DISPLAY_NAME = "Video Encoder (FFMPEG)"
ICON = "🎬"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.sequence | {
"name": ("STRING", {"default": 'video', "multiline": False}),
"framerate_factor": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 100.0}),
"remove_images": ("BOOLEAN", {"default": True})
},
}
CATEGORY = NodeCategories.ANIMATION_POSTPROCESSING
RETURN_TYPES = (LogEntry.ID,)
RETURN_NAMES = ("log_entry",)
OUTPUT_NODE = True
FUNCTION = "encode"
@classmethod
def IS_CHANGED(cls, sequence: AnimationSequence, **kwargs):
return sequence.is_defined
def _find_free_filename(self, filename, defaultdir):
if os.path.basename(filename) == filename:
filename = os.path.join(defaultdir, filename)
n = 1
tested = filename
while os.path.exists(tested):
n += 1
(b, ext) = os.path.splitext(filename)
tested = b + "_" + str(n) + ext
return tested
def generate_video(self, files, fps, filename, config):
filename = self._find_free_filename(filename, os.path.dirname(files[0]))
_ffmpeg(config, files, fps, filename)
return filename
def encode(self, sequence: AnimationSequence, name: str, remove_images, framerate_factor):
if not sequence.is_defined:
return (LogEntry([]),)
config = DreamConfig()
filename = _make_video_filename(name, config.get("ffmpeg.file_extension", "mp4"))
log_entry = LogEntry([])
for batch_num in sequence.batches:
try:
images = list(sequence.get_image_files_of_batch(batch_num))
actual_filename = self.generate_video(images, sequence.fps * framerate_factor, filename, config)
log_entry = log_entry.add("Generated video '{}'".format(actual_filename))
if remove_images:
for imagepath in images:
if os.path.isfile(imagepath):
os.unlink(imagepath)
except Exception as e:
on_error(self.__class__, str(e))
return (log_entry,)
class DreamSequenceTweening:
NODE_NAME = "Image Sequence Tweening"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.sequence | {
"multiplier": ("INT", {"default": 2, "min": 2, "max": 10}),
},
}
CATEGORY = NodeCategories.ANIMATION_POSTPROCESSING
RETURN_TYPES = (AnimationSequence.ID,)
RETURN_NAMES = ("sequence",)
OUTPUT_NODE = False
FUNCTION = "process"
@classmethod
def IS_CHANGED(cls, sequence: AnimationSequence, **kwargs):
return sequence.is_defined
def process(self, sequence: AnimationSequence, multiplier):
if not sequence.is_defined:
return (sequence,)
def _generate_extra_frames(input_index, last_index, images):
results = {}
if input_index == last_index:
# special case
for i in range(multiplier):
results[input_index * multiplier + i] = images[0]
return results
# normal case
current_frame = images[0]
next_frame = images[1]
for i in range(multiplier):
alpha = float(i + 1) / multiplier
results[multiplier * input_index + i] = current_frame.blend(next_frame, 1.0 - alpha, alpha)
return results
proc = AnimationSeqProcessor(sequence)
return (proc.process([0, 1], _generate_extra_frames),)
class DreamSequenceBlend:
NODE_NAME = "Image Sequence Blend"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.sequence | {
"fade_in": ("FLOAT", {"default": 0.1, "min": 0.01, "max": 0.5}),
"fade_out": ("FLOAT", {"default": 0.1, "min": 0.01, "max": 0.5}),
"iterations": ("INT", {"default": 1, "min": 1, "max": 10}),
},
}
CATEGORY = NodeCategories.ANIMATION_POSTPROCESSING
RETURN_TYPES = (AnimationSequence.ID,)
RETURN_NAMES = ("sequence",)
OUTPUT_NODE = False
FUNCTION = "process"
@classmethod
def IS_CHANGED(cls, sequence: AnimationSequence, **kwargs):
return sequence.is_defined
def process(self, sequence: AnimationSequence, fade_in, fade_out, iterations):
if not sequence.is_defined:
return (sequence,)
current_sequence = sequence
for i in range(iterations):
proc = AnimationSeqProcessor(current_sequence)
def _blur(index: int, last_index: int, images: List[DreamImage]):
pre_frame = images[0].blend(images[1], fade_in, 1.0)
post_frame = images[2].blend(images[1], fade_out, 1.0)
return {index: pre_frame.blend(post_frame)}
current_sequence = proc.process([-1, 0, 1], _blur)
return (current_sequence,)