from utils_sig import * import joblib import numpy as np from lightgbm import LGBMRegressor import heartpy as hp import scipy.signal as sig class PhysiologicalIndicators: def __init__(self): self.heart_rate = 0 self.respiratory_rate = 0 self.heart_rate_variability = 0 self.SpO2 = 0 self.blood_pressure = 0 def calculate_heart_rate(self, ippg_data, fps): # 计算心率的代码 print("HR processing") self.heart_rate, self.respiratory_rate = hr_fft_2(ippg_data, fps) # ippg = butter_bandpass(ippg_data, lowcut=0.6, highcut=4, fs=fps) # self.heart_rate, psd_y, psd_x = hr_fft(ippg, fs=fps) print("HR done") return self.heart_rate, self.respiratory_rate def calculate_heart_rate_variability(self, ippg_data, fps): # 计算心率变异性的代码 # TODO: 实现心率变异性计算 self.heart_rate_variability = calculate_hrv(ippg_data, fps) return self.heart_rate_variability def calculate_SpO2(self, ROI_list, ROI2_list): # 计算血氧饱和度的代码 # TODO: 实现血氧饱和度计算 ROI1_SpO2 = RGB_SpO2(ROI_list) ROI2_SpO2 = RGB_SpO2(ROI2_list) self.SpO2 = (ROI1_SpO2 + ROI2_SpO2) / 2 return self.SpO2 def calculate_blood_pressure(self, ippg_data): # 计算血压的代码 ippg_data = np.array(ippg_data).reshape(len(ippg_data),1) bp_pred = [] model_list = joblib.load( './code/model_weight/lgb_model_ppg2bp.pkl') for model in model_list: result = model.predict(ippg_data) bp_pred.append(result+10) bp_list = np.mean(bp_pred, axis=0) return bp_list,np.max(bp_list),np.min(bp_list)-15 def calculate_HR(self, ROI_list, ROI2_list): # 计算HR的代码 ROI1_HR = RGB_HR(ROI_list) ROI2_HR = RGB_HR(ROI2_list) print("ROI1_HR",ROI1_HR,"ROI2_HR",ROI2_HR) HR = (ROI1_HR + ROI2_HR) / 2 return HR # 定义一个函数来计算心率 def calculate_heart_rate_2(self,ppg_signal, sampling_rate): # 使用巴特沃斯滤波器处理信号,去除噪声 nyquist_frequency = sampling_rate / 2.0 low_cutoff_frequency = 0.5 high_cutoff_frequency = 5.0 filter_order = 2 b, a = sig.butter(filter_order, [low_cutoff_frequency/nyquist_frequency, high_cutoff_frequency/nyquist_frequency], btype='band') filtered_signal = sig.filtfilt(b, a, ppg_signal) # 计算心率 window_length = int(sampling_rate * 0.75) step_size = int(sampling_rate * 0.1) threshold = 0.4 # 使用峰值检测算法来找到脉冲峰值 peak_indexes, _ = sig.find_peaks(filtered_signal, distance=10) print("============================",peak_indexes,sampling_rate) # 计算时间间隔并计算心率 # time_intervals = np.diff(peak_indexes) / float(sampling_rate) time_intervals = np.diff(peak_indexes) * 0.045 heart_rate = 60.0 / np.mean(time_intervals) return heart_rate # 定义一个函数来从rppg信号中计算心率 def calculate_heart_rate_3(self,signal): wd, m = hp.process(signal, sample_rate = 100.0) return wd, m # def calculate_SPO2(self, ippg_chanel_data): # # 计算血氧的代码 # ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6) # SPO2_pred = [] # model_list = joblib.load( './model_weight/lgb_model_threechanel2spo2.pkl') # for model in model_list: # result = model.predict(ippg_chanel_data) # SPO2_pred.append(result) # SPO2_list = np.mean(SPO2_pred, axis=0) # return SPO2_list