cabasus / funcs /tools.py
arcan3's picture
added program
a5bd089
raw
history blame
1.64 kB
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter, correlate, find_peaks
def numpy_to_native(data):
if isinstance(data, (np.int64, np.int32)):
return int(data)
elif isinstance(data, (np.float64, np.float32)):
return float(data)
elif isinstance(data, np.ndarray):
return data.tolist()
elif isinstance(data, dict):
return {k: numpy_to_native(v) for k, v in data.items()}
elif isinstance(data, list):
return [numpy_to_native(v) for v in data]
else:
return data
def process_signals(gz_signal, upsample_factor, window_size=40, poly_order=2, peak_distance=2, peak_prominence=1):
smoothed_signal = savgol_filter(gz_signal, window_size, poly_order)
upsampled_smoothed_signal = upsample_signal(smoothed_signal, upsample_factor)
autocorr = correlate(upsampled_smoothed_signal, upsampled_smoothed_signal, mode='full')
autocorr = autocorr[autocorr.size // 2:]
peaks, _ = find_peaks(autocorr, distance=peak_distance, prominence=peak_prominence)
return upsampled_smoothed_signal, peaks
def fill_missing_values(data, window_size=10):
for col in data.columns:
if data[col].isna().any():
data[col] = data[col].rolling(window=window_size, min_periods=1, center=True).median()
return data
def upsample_signal(signal, upsample_factor):
x = np.arange(signal.size)
interpolator = interp1d(x, signal, kind='quadratic')
x_upsampled = np.linspace(0, signal.size - 1, signal.size * upsample_factor)
return interpolator(x_upsampled)