StupidGame's picture
Upload 1941 files
baa8e90
raw
history blame contribute delete
No virus
16.8 kB
# -*- coding: utf-8 -*-
import csv
import functools
import math
import os
from scipy.io.wavfile import read as wav_read
from .categories import NodeCategories
from .shared import hashed_as_strings
from .dreamtypes import SharedTypes, FrameCounter
def _linear_value_calc(x, x_start, x_end, y_start, y_end):
if x <= x_start:
return y_start
if x >= x_end:
return y_end
dx = max(x_end - x_start, 0.0001)
n = (x - x_start) / dx
return (y_end - y_start) * n + y_start
def _curve_result(f: float):
return (f, int(round(f)))
class DreamSineWave:
NODE_NAME = "Sine Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}),
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}),
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase):
x = frame_counter.current_time_in_seconds
a = (max_value - min_value) * 0.5
c = phase
b = 2 * math.pi / periodicity_seconds
d = (max_value + min_value) / 2
y = a * math.sin(b * (x + c)) + d
return _curve_result(y)
class DreamSawWave:
NODE_NAME = "Saw Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}),
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}),
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase):
x = frame_counter.current_time_in_seconds
x = ((x + periodicity_seconds * phase) % periodicity_seconds) / periodicity_seconds
y = x * (max_value - min_value) + min_value
return _curve_result(y)
class DreamTriangleWave:
NODE_NAME = "Triangle Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}),
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}),
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase):
x = frame_counter.current_time_in_seconds
x = ((x + periodicity_seconds * phase) % periodicity_seconds) / periodicity_seconds
if x <= 0.5:
x *= 2
y = x * (max_value - min_value) + min_value
else:
x = (x - 0.5) * 2
y = max_value - x * (max_value - min_value)
return _curve_result(y)
class WavData:
def __init__(self, sampling_rate: float, single_channel_samples, fps: float):
self._length_in_seconds = len(single_channel_samples) / sampling_rate
self._num_buckets = round(self._length_in_seconds * fps * 3)
self._bucket_size = len(single_channel_samples) / float(self._num_buckets)
self._buckets = list()
self._rate = sampling_rate
self._max_bucket_value = 0
for i in range(self._num_buckets):
start_index = round(i * self._bucket_size)
end_index = round((i + 1) * self._bucket_size) - 1
samples = list(map(lambda n: abs(n), single_channel_samples[start_index:end_index]))
bucket_total = sum(samples)
self._buckets.append(bucket_total)
self._max_bucket_value=max(bucket_total, self._max_bucket_value)
for i in range(self._num_buckets):
self._buckets[i] = float(self._buckets[i]) / self._max_bucket_value
def value_at_time(self, second: float) -> float:
if second < 0.0 or second > self._length_in_seconds:
return 0.0
nsample = second * self._rate
nbucket = min(max(0, round(nsample / self._bucket_size)), self._num_buckets - 1)
return self._buckets[nbucket]
@functools.lru_cache(4)
def _wav_loader(filepath, fps):
sampling_rate, samples = wav_read(filepath)
single_channel = samples[:, 0]
return WavData(sampling_rate, single_channel, fps)
class DreamWavCurve:
NODE_NAME = "WAV Curve"
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
ICON = "∿"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"wav_path": ("STRING", {"default": "audio.wav"}),
"scale": ("FLOAT", {"default": 1.0, "multiline": False})
},
}
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, wav_path, scale):
if not os.path.isfile(wav_path):
return (0.0, 0)
data = _wav_loader(wav_path, frame_counter.frames_per_second)
frame_counter.current_time_in_seconds
v = data.value_at_time(frame_counter.current_time_in_seconds)
return (v * scale, round(v * scale))
class DreamTriangleEvent:
NODE_NAME = "Triangle Event Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}),
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"width_seconds": ("FLOAT", {"default": 1.0, "multiline": False, "min": 0.1}),
"center_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.0}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, max_value, min_value, width_seconds, center_seconds):
x = frame_counter.current_time_in_seconds
start = center_seconds - width_seconds * 0.5
end = center_seconds + width_seconds * 0.5
if start <= x <= center_seconds:
y = _linear_value_calc(x, start, center_seconds, min_value, max_value)
elif center_seconds < x <= end:
y = _linear_value_calc(x, center_seconds, end, max_value, min_value)
else:
y = min_value
return _curve_result(y)
class DreamSmoothEvent:
NODE_NAME = "Smooth Event Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}),
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"width_seconds": ("FLOAT", {"default": 1.0, "multiline": False, "min": 0.1}),
"center_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.0}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, frame_counter: FrameCounter, max_value, min_value, width_seconds, center_seconds):
x = frame_counter.current_time_in_seconds
start = center_seconds - width_seconds * 0.5
end = center_seconds + width_seconds * 0.5
if start <= x <= center_seconds:
y = _linear_value_calc(x, start, center_seconds, 0.0, 1.0)
elif center_seconds < x <= end:
y = _linear_value_calc(x, center_seconds, end, 1.0, 0.0)
else:
y = 0.0
if y < 0.5:
y = ((y + y) * (y + y)) * 0.5
else:
a = (y - 0.5) * 2
y = math.pow(a, 0.25) * 0.5 + 0.5
return _curve_result(y * (max_value - min_value) + min_value)
class DreamBeatCurve:
NODE_NAME = "Beat Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"bpm": ("FLOAT", {"default": 100.0, "multiline": False}),
"time_offset": ("FLOAT", {"default": 0.0, "multiline": False}),
"measure_length": ("INT", {"default": 4, "min": 1}),
"low_value": ("FLOAT", {"default": 0.0}),
"high_value": ("FLOAT", {"default": 1.0}),
"invert": (["no", "yes"],),
"power": ("FLOAT", {"default": 2.0, "min": 0.25, "max": 4}),
"accent_1": ("INT", {"default": 1, "min": 1, "max": 24}),
},
"optional": {
"accent_2": ("INT", {"default": 3, "min": 1, "max": 24}),
"accent_3": ("INT", {"default": 0}),
"accent_4": ("INT", {"default": 0}),
}
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def _get_value_for_accent(self, accent, measure_length, bpm, frame_counter: FrameCounter, frame_offset):
current_frame = frame_counter.current_frame + frame_offset
frames_per_minute = frame_counter.frames_per_second * 60.0
frames_per_beat = frames_per_minute / bpm
frames_per_measure = frames_per_beat * measure_length
frame = (current_frame % frames_per_measure)
accent_start = (accent - 1) * frames_per_beat
accent_end = accent * frames_per_beat
if frame >= accent_start and frame < accent_end:
return 1.0 - ((frame - accent_start) / frames_per_beat)
return 0
def result(self, bpm, frame_counter: FrameCounter, measure_length, low_value, high_value, power, invert,
time_offset, **accents):
frame_offset = int(round(time_offset * frame_counter.frames_per_second))
accents_set = set(filter(lambda v: v >= 1 and v <= measure_length,
map(lambda i: accents.get("accent_" + str(i), -1), range(30))))
v = 0.0
for a in accents_set:
v += math.pow(self._get_value_for_accent(a, measure_length, bpm, frame_counter, frame_offset), power)
if invert == "yes":
v = 1.0 - v
r = low_value + v * (high_value - low_value)
return _curve_result(r)
class DreamLinear:
NODE_NAME = "Linear Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"initial_value": ("FLOAT", {"default": 0.0, "multiline": False}),
"final_value": ("FLOAT", {"default": 100.0, "multiline": False}),
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def result(self, initial_value, final_value, frame_counter: FrameCounter):
d = final_value - initial_value
v = initial_value + frame_counter.progress * d
return (v, int(round(v)))
def _is_as_float(s: str):
try:
float(s)
return True
except ValueError:
return False
class DreamCSVGenerator:
NODE_NAME = "CSV Generator"
ICON = "⌗"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"value": ("FLOAT", {"forceInput": True, "default": 0.0}),
"csvfile": ("STRING", {"default": "", "multiline": False}),
"csv_dialect": (csv.list_dialects(),)
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ()
RETURN_NAMES = ()
FUNCTION = "write"
OUTPUT_NODE = True
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def write(self, csvfile, frame_counter: FrameCounter, value, csv_dialect):
if frame_counter.is_first_frame and csvfile:
with open(csvfile, 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile, dialect=csv_dialect)
csvwriter.writerow(['Frame', 'Value'])
csvwriter.writerow([frame_counter.current_frame, str(value)])
else:
with open(csvfile, 'a', newline='') as csvfile:
csvwriter = csv.writer(csvfile, dialect=csv_dialect)
csvwriter.writerow([frame_counter.current_frame, str(value)])
return ()
class DreamCSVCurve:
NODE_NAME = "CSV Curve"
@classmethod
def INPUT_TYPES(cls):
return {
"required": SharedTypes.frame_counter | {
"csvfile": ("STRING", {"default": "", "multiline": False}),
"first_column_type": (["seconds", "frames"],),
"interpolate": (["true", "false"],),
"csv_dialect": (csv.list_dialects(),)
},
}
CATEGORY = NodeCategories.ANIMATION_CURVES
RETURN_TYPES = ("FLOAT", "INT")
RETURN_NAMES = ("FLOAT", "INT")
FUNCTION = "result"
@classmethod
def IS_CHANGED(cls, *values):
return hashed_as_strings(*values)
def _row_yield(self, file, csv_dialect):
prev_row = None
for row in csv.reader(file, dialect=csv_dialect):
if len(row) == 2 and _is_as_float(row[0]) and _is_as_float(row[1]):
row = list(map(float, row))
yield (prev_row, row)
prev_row = row
if prev_row is not None:
yield (prev_row, None)
def result(self, csvfile, frame_counter: FrameCounter, first_column_type, interpolate, csv_dialect):
interpolate = interpolate == "true"
def _first_col_to_frame(v: float):
if first_column_type == "frames":
return round(v)
else:
return round(v * frame_counter.frames_per_second)
with open(csvfile) as f:
for (prev, current) in self._row_yield(f, csv_dialect):
if prev is None and frame_counter.current_frame < _first_col_to_frame(current[0]):
# before first row
return (current[1], int(round(current[1])))
if current is None:
# after last row
return (prev[1], int(round(prev[1])))
if prev is not None and current is not None:
frame1 = _first_col_to_frame(prev[0])
value1 = prev[1]
frame2 = _first_col_to_frame(current[0])
value2 = current[1]
if frame1 <= frame_counter.current_frame and interpolate and frame2 > frame_counter.current_frame:
offset = (frame_counter.current_frame - frame1) / float(frame2 - frame1)
v = value1 * (1.0 - offset) + value2 * offset
return (v, int(round(v)))
elif frame1 <= frame_counter.current_frame and frame2 > frame_counter.current_frame:
return (value1, int(round(value1)))
return (0.0, 0)