Spaces:
Runtime error
Runtime error
from utils_sig import * | |
import joblib | |
import numpy as np | |
from lightgbm import LGBMRegressor | |
import heartpy as hp | |
import scipy.signal as sig | |
class PhysiologicalIndicators: | |
def __init__(self): | |
self.heart_rate = 0 | |
self.respiratory_rate = 0 | |
self.heart_rate_variability = 0 | |
self.SpO2 = 0 | |
self.blood_pressure = 0 | |
def calculate_heart_rate(self, ippg_data, fps): | |
# 计算心率的代码 | |
print("HR processing") | |
self.heart_rate, self.respiratory_rate = hr_fft_2(ippg_data, fps) | |
# ippg = butter_bandpass(ippg_data, lowcut=0.6, highcut=4, fs=fps) | |
# self.heart_rate, psd_y, psd_x = hr_fft(ippg, fs=fps) | |
print("HR done") | |
return self.heart_rate, self.respiratory_rate | |
def calculate_heart_rate_variability(self, ippg_data, fps): | |
# 计算心率变异性的代码 | |
# TODO: 实现心率变异性计算 | |
self.heart_rate_variability = calculate_hrv(ippg_data, fps) | |
return self.heart_rate_variability | |
def calculate_SpO2(self, ROI_list, ROI2_list): | |
# 计算血氧饱和度的代码 | |
# TODO: 实现血氧饱和度计算 | |
ROI1_SpO2 = RGB_SpO2(ROI_list) | |
ROI2_SpO2 = RGB_SpO2(ROI2_list) | |
self.SpO2 = (ROI1_SpO2 + ROI2_SpO2) / 2 | |
return self.SpO2 | |
def calculate_blood_pressure(self, ippg_data): | |
# 计算血压的代码 | |
ippg_data = np.array(ippg_data).reshape(len(ippg_data),1) | |
bp_pred = [] | |
model_list = joblib.load( './model_weight/lgb_model_ppg2bp.pkl') | |
for model in model_list: | |
result = model.predict(ippg_data) | |
bp_pred.append(result+10) | |
bp_list = np.mean(bp_pred, axis=0) | |
return bp_list,np.max(bp_list),np.min(bp_list)-15 | |
def calculate_HR(self, ROI_list, ROI2_list): | |
# 计算HR的代码 | |
ROI1_HR = RGB_HR(ROI_list) | |
ROI2_HR = RGB_HR(ROI2_list) | |
print("ROI1_HR",ROI1_HR,"ROI2_HR",ROI2_HR) | |
HR = (ROI1_HR + ROI2_HR) / 2 | |
return HR | |
# 定义一个函数来计算心率 | |
def calculate_heart_rate_2(self,ppg_signal, sampling_rate): | |
# 使用巴特沃斯滤波器处理信号,去除噪声 | |
nyquist_frequency = sampling_rate / 2.0 | |
low_cutoff_frequency = 0.5 | |
high_cutoff_frequency = 5.0 | |
filter_order = 2 | |
b, a = sig.butter(filter_order, [low_cutoff_frequency/nyquist_frequency, high_cutoff_frequency/nyquist_frequency], btype='band') | |
filtered_signal = sig.filtfilt(b, a, ppg_signal) | |
# 计算心率 | |
window_length = int(sampling_rate * 0.75) | |
step_size = int(sampling_rate * 0.1) | |
threshold = 0.4 | |
# 使用峰值检测算法来找到脉冲峰值 | |
peak_indexes, _ = sig.find_peaks(filtered_signal, distance=10) | |
print("============================",peak_indexes,sampling_rate) | |
# 计算时间间隔并计算心率 | |
# time_intervals = np.diff(peak_indexes) / float(sampling_rate) | |
time_intervals = np.diff(peak_indexes) * 0.045 | |
heart_rate = 60.0 / np.mean(time_intervals) | |
return heart_rate | |
# 定义一个函数来从rppg信号中计算心率 | |
def calculate_heart_rate_3(self,signal): | |
wd, m = hp.process(signal, sample_rate = 100.0) | |
return wd, m | |
# def calculate_SPO2(self, ippg_chanel_data): | |
# # 计算血氧的代码 | |
# ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6) | |
# SPO2_pred = [] | |
# model_list = joblib.load( './model_weight/lgb_model_threechanel2spo2.pkl') | |
# for model in model_list: | |
# result = model.predict(ippg_chanel_data) | |
# SPO2_pred.append(result) | |
# SPO2_list = np.mean(SPO2_pred, axis=0) | |
# return SPO2_list | |