human_health_gradio / code /physiological_indicators.py
Juliojuse's picture
add requirement
d15a19c
raw
history blame contribute delete
No virus
3.83 kB
from utils_sig import *
import joblib
import numpy as np
from lightgbm import LGBMRegressor
import heartpy as hp
import scipy.signal as sig
class PhysiologicalIndicators:
def __init__(self):
self.heart_rate = 0
self.respiratory_rate = 0
self.heart_rate_variability = 0
self.SpO2 = 0
self.blood_pressure = 0
def calculate_heart_rate(self, ippg_data, fps):
# 计算心率的代码
print("HR processing")
self.heart_rate, self.respiratory_rate = hr_fft_2(ippg_data, fps)
# ippg = butter_bandpass(ippg_data, lowcut=0.6, highcut=4, fs=fps)
# self.heart_rate, psd_y, psd_x = hr_fft(ippg, fs=fps)
print("HR done")
return self.heart_rate, self.respiratory_rate
def calculate_heart_rate_variability(self, ippg_data, fps):
# 计算心率变异性的代码
# TODO: 实现心率变异性计算
self.heart_rate_variability = calculate_hrv(ippg_data, fps)
return self.heart_rate_variability
def calculate_SpO2(self, ROI_list, ROI2_list):
# 计算血氧饱和度的代码
# TODO: 实现血氧饱和度计算
ROI1_SpO2 = RGB_SpO2(ROI_list)
ROI2_SpO2 = RGB_SpO2(ROI2_list)
self.SpO2 = (ROI1_SpO2 + ROI2_SpO2) / 2
return self.SpO2
def calculate_blood_pressure(self, ippg_data):
# 计算血压的代码
ippg_data = np.array(ippg_data).reshape(len(ippg_data),1)
bp_pred = []
model_list = joblib.load( './code/model_weight/lgb_model_ppg2bp.pkl')
for model in model_list:
result = model.predict(ippg_data)
bp_pred.append(result+10)
bp_list = np.mean(bp_pred, axis=0)
return bp_list,np.max(bp_list),np.min(bp_list)-15
def calculate_HR(self, ROI_list, ROI2_list):
# 计算HR的代码
ROI1_HR = RGB_HR(ROI_list)
ROI2_HR = RGB_HR(ROI2_list)
print("ROI1_HR",ROI1_HR,"ROI2_HR",ROI2_HR)
HR = (ROI1_HR + ROI2_HR) / 2
return HR
# 定义一个函数来计算心率
def calculate_heart_rate_2(self,ppg_signal, sampling_rate):
# 使用巴特沃斯滤波器处理信号,去除噪声
nyquist_frequency = sampling_rate / 2.0
low_cutoff_frequency = 0.5
high_cutoff_frequency = 5.0
filter_order = 2
b, a = sig.butter(filter_order, [low_cutoff_frequency/nyquist_frequency, high_cutoff_frequency/nyquist_frequency], btype='band')
filtered_signal = sig.filtfilt(b, a, ppg_signal)
# 计算心率
window_length = int(sampling_rate * 0.75)
step_size = int(sampling_rate * 0.1)
threshold = 0.4
# 使用峰值检测算法来找到脉冲峰值
peak_indexes, _ = sig.find_peaks(filtered_signal, distance=10)
print("============================",peak_indexes,sampling_rate)
# 计算时间间隔并计算心率
# time_intervals = np.diff(peak_indexes) / float(sampling_rate)
time_intervals = np.diff(peak_indexes) * 0.045
heart_rate = 60.0 / np.mean(time_intervals)
return heart_rate
# 定义一个函数来从rppg信号中计算心率
def calculate_heart_rate_3(self,signal):
wd, m = hp.process(signal, sample_rate = 100.0)
return wd, m
# def calculate_SPO2(self, ippg_chanel_data):
# # 计算血氧的代码
# ippg_chanel_data = np.array(ippg_chanel_data).reshape(len(ippg_chanel_data),6)
# SPO2_pred = []
# model_list = joblib.load( './model_weight/lgb_model_threechanel2spo2.pkl')
# for model in model_list:
# result = model.predict(ippg_chanel_data)
# SPO2_pred.append(result)
# SPO2_list = np.mean(SPO2_pred, axis=0)
# return SPO2_list