|
|
|
import numpy as np |
|
from scipy.optimize import curve_fit |
|
from sympy import symbols, sympify, lambdify |
|
import warnings |
|
from sklearn.metrics import mean_squared_error |
|
|
|
class BioprocessModel: |
|
def __init__(self): |
|
self.params = {} |
|
self.models = {} |
|
self.r2 = {} |
|
self.rmse = {} |
|
print("DEBUG (models.py): BioprocessModel instanciado.") |
|
|
|
def set_model(self, model_type, equation_str, param_str): |
|
print(f"\nDEBUG (models.py): set_model llamado para tipo='{model_type}'") |
|
print(f" Equation str (raw): '{equation_str}'") |
|
print(f" Param str (raw): '{param_str}'") |
|
|
|
equation_str_cleaned = str(equation_str).strip() |
|
if '=' in equation_str_cleaned: |
|
equation_str_cleaned = equation_str_cleaned.split('=', 1)[1].strip() |
|
|
|
if not equation_str_cleaned: |
|
print(f"ERROR (models.py): Ecuación vacía para {model_type}.") |
|
raise ValueError(f"La cadena de la ecuación para '{model_type}' no puede estar vacía.") |
|
if not param_str: |
|
print(f"ERROR (models.py): Cadena de parámetros vacía para {model_type}.") |
|
raise ValueError(f"La cadena de parámetros para '{model_type}' no puede estar vacía.") |
|
|
|
params_list = [param.strip() for param in param_str.split(',')] |
|
if not all(params_list): |
|
print(f"ERROR (models.py): Algún nombre de parámetro está vacío en '{param_str}' para {model_type}.") |
|
raise ValueError(f"Los nombres de los parámetros no pueden ser vacíos para '{model_type}'.") |
|
|
|
print(f" Equation (cleaned): '{equation_str_cleaned}'") |
|
print(f" Params list: {params_list}") |
|
|
|
self.models[model_type] = { |
|
'equation_str': equation_str_cleaned, |
|
'params': params_list |
|
} |
|
|
|
try: |
|
|
|
t_sym = symbols('t') |
|
|
|
current_param_syms = [] |
|
for p_name in params_list: |
|
if not p_name.isidentifier(): |
|
raise ValueError(f"Nombre de parámetro '{p_name}' no es un identificador Python válido para sympy.") |
|
current_param_syms.append(symbols(p_name)) |
|
|
|
|
|
X_val_sym = symbols('X_val') |
|
|
|
|
|
sympy_expr = sympify(equation_str_cleaned) |
|
print(f" Sympy expression: {sympy_expr}") |
|
print(f" Free symbols in expr: {sympy_expr.free_symbols}") |
|
|
|
|
|
self.models[model_type]['sympy_expr'] = sympy_expr |
|
self.models[model_type]['param_symbols'] = tuple(current_param_syms) |
|
self.models[model_type]['time_symbol'] = t_sym |
|
self.models[model_type]['X_val_symbol'] = X_val_sym |
|
print(f" Modelo '{model_type}' configurado exitosamente.") |
|
|
|
except Exception as e: |
|
print(f"ERROR (models.py): Fallo al procesar con sympy para '{model_type}': {e}") |
|
raise ValueError(f"Error en la ecuación o parámetros para '{model_type}': {e}") |
|
|
|
|
|
def fit_model(self, model_type, time, data, bounds, biomass_params_fitted=None): |
|
print(f"\nDEBUG (models.py): fit_model llamado para tipo='{model_type}'") |
|
if model_type not in self.models: |
|
print(f"ERROR (models.py): Modelo para '{model_type}' no configurado.") |
|
raise ValueError(f"Modelo para '{model_type}' no configurado. Llama a set_model primero.") |
|
|
|
model_config = self.models[model_type] |
|
equation_expr = model_config['sympy_expr'] |
|
current_param_names = model_config['params'] |
|
current_param_syms = model_config['param_symbols'] |
|
t_sym = model_config['time_symbol'] |
|
X_val_sym = model_config['X_val_symbol'] |
|
|
|
print(f" Ajustando con ecuación: {equation_expr}") |
|
print(f" Parámetros a ajustar: {current_param_names}") |
|
print(f" Datos de tiempo (primeros 5): {time[:5]}") |
|
print(f" Datos experimentales (primeros 5): {data[:5]}") |
|
print(f" Límites: {bounds}") |
|
if biomass_params_fitted: |
|
print(f" Parámetros de biomasa ajustados (para S/P): {biomass_params_fitted}") |
|
|
|
|
|
def fit_model_wrapper(t_data_wrapper, *current_p_values_wrapper): |
|
|
|
|
|
|
|
|
|
subs_dict_wrapper = {sym: val for sym, val in zip(current_param_syms, current_p_values_wrapper)} |
|
|
|
|
|
lambdify_args_wrapper = [t_sym] + list(current_param_syms) |
|
expr_to_lambdify = equation_expr |
|
|
|
|
|
if model_type in ['substrate', 'product'] and X_val_sym in equation_expr.free_symbols: |
|
if biomass_params_fitted is None or 'biomass' not in self.models or 'sympy_expr' not in self.models['biomass']: |
|
print("ERROR (models.py fit_model_wrapper): Falta config/params de biomasa para modelo S/P dependiente.") |
|
|
|
return np.full_like(t_data_wrapper, np.nan) |
|
|
|
biomass_model_config = self.models['biomass'] |
|
biomass_expr = biomass_model_config['sympy_expr'] |
|
biomass_p_syms = biomass_model_config['param_symbols'] |
|
biomass_t_sym = biomass_model_config['time_symbol'] |
|
|
|
biomass_subs_for_calc = {sym: biomass_params_fitted[name] for sym, name in zip(biomass_p_syms, biomass_model_config['params'])} |
|
|
|
|
|
|
|
try: |
|
|
|
if 'biomass_func_lambdified' not in biomass_model_config: |
|
biomass_model_config['biomass_func_lambdified'] = lambdify( |
|
[biomass_t_sym] + list(biomass_p_syms), |
|
biomass_expr, |
|
'numpy' |
|
) |
|
|
|
biomass_p_values_for_calc = [biomass_params_fitted[p_name] for p_name in biomass_model_config['params']] |
|
X_t_values_wrapper = biomass_model_config['biomass_func_lambdified'](t_data_wrapper, *biomass_p_values_for_calc) |
|
|
|
except Exception as e_biomass_calc: |
|
print(f"ERROR (models.py fit_model_wrapper): Calculando X(t) para S/P: {e_biomass_calc}") |
|
return np.full_like(t_data_wrapper, np.nan) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if X_val_sym not in current_param_syms: |
|
lambdify_args_wrapper.append(X_val_sym) |
|
|
|
|
|
|
|
func_compiled = lambdify(lambdify_args_wrapper, expr_to_lambdify, 'numpy') |
|
|
|
|
|
try: |
|
|
|
call_args = [t_data_wrapper] + list(current_p_values_wrapper) |
|
if X_val_sym in lambdify_args_wrapper[-1:]: |
|
call_args.append(X_t_values_wrapper) |
|
return func_compiled(*call_args) |
|
except Exception as e_sp_eval: |
|
print(f"ERROR (models.py fit_model_wrapper): Evaluando S/P con X_val: {e_sp_eval}") |
|
return np.full_like(t_data_wrapper, np.nan) |
|
|
|
else: |
|
func_compiled = lambdify(lambdify_args_wrapper, expr_to_lambdify, 'numpy') |
|
try: |
|
return func_compiled(t_data_wrapper, *current_p_values_wrapper) |
|
except Exception as e_bio_eval: |
|
print(f"ERROR (models.py fit_model_wrapper): Evaluando biomasa: {e_bio_eval}") |
|
return np.full_like(t_data_wrapper, np.nan) |
|
|
|
|
|
p0 = np.ones(len(current_param_names)) |
|
lower_bounds, upper_bounds = bounds |
|
lower_bounds = np.array(lower_bounds if len(lower_bounds) == len(p0) else [-np.inf] * len(p0)) |
|
upper_bounds = np.array(upper_bounds if len(upper_bounds) == len(p0) else [np.inf] * len(p0)) |
|
|
|
print(f" Estimaciones iniciales p0: {p0}") |
|
print(f" Límites para curve_fit: L={lower_bounds}, U={upper_bounds}") |
|
|
|
popt, pcov = None, None |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter("ignore", RuntimeWarning) |
|
warnings.simplefilter("ignore", UserWarning) |
|
try: |
|
popt, pcov = curve_fit(fit_model_wrapper, time, data, p0=p0, bounds=(lower_bounds, upper_bounds), maxfev=50000, method='trf') |
|
print(f" curve_fit completado. Parámetros optimizados (popt): {popt}") |
|
except RuntimeError as e_curvefit: |
|
print(f"ERROR (models.py): curve_fit falló para {model_type} con RuntimeError: {e_curvefit}") |
|
self.params[model_type] = {p: np.nan for p in current_param_names} |
|
self.r2[model_type] = np.nan |
|
self.rmse[model_type] = np.nan |
|
return np.full_like(data, np.nan), None |
|
except ValueError as e_val_curvefit: |
|
print(f"ERROR (models.py): curve_fit falló para {model_type} con ValueError: {e_val_curvefit}") |
|
self.params[model_type] = {p: np.nan for p in current_param_names} |
|
self.r2[model_type] = np.nan |
|
self.rmse[model_type] = np.nan |
|
return np.full_like(data, np.nan), None |
|
except Exception as e_gen_curvefit: |
|
print(f"ERROR (models.py): curve_fit falló inesperadamente para {model_type}: {e_gen_curvefit}") |
|
self.params[model_type] = {p: np.nan for p in current_param_names} |
|
self.r2[model_type] = np.nan |
|
self.rmse[model_type] = np.nan |
|
return np.full_like(data, np.nan), None |
|
|
|
|
|
if popt is None: |
|
return np.full_like(data, np.nan), None |
|
|
|
self.params[model_type] = dict(zip(current_param_names, popt)) |
|
|
|
|
|
try: |
|
y_pred = fit_model_wrapper(time, *popt) |
|
if np.any(np.isnan(y_pred)): |
|
print(f"ADVERTENCIA (models.py): y_pred contiene NaNs después del ajuste para {model_type}.") |
|
self.r2[model_type] = np.nan |
|
self.rmse[model_type] = np.nan |
|
|
|
else: |
|
ss_res = np.sum((data - y_pred) ** 2) |
|
ss_tot = np.sum((data - np.mean(data)) ** 2) |
|
if ss_tot == 0: |
|
self.r2[model_type] = 1.0 if ss_res < 1e-9 else 0.0 |
|
else: |
|
self.r2[model_type] = 1 - (ss_res / ss_tot) |
|
self.rmse[model_type] = np.sqrt(mean_squared_error(data, y_pred)) |
|
except Exception as e_ypred: |
|
print(f"ERROR (models.py): Calculando y_pred final para {model_type}: {e_ypred}") |
|
y_pred = np.full_like(data, np.nan) |
|
self.r2[model_type] = np.nan |
|
self.rmse[model_type] = np.nan |
|
|
|
|
|
print(f" Ajuste para {model_type} completado. R2: {self.r2.get(model_type)}, RMSE: {self.rmse.get(model_type)}") |
|
return y_pred, popt |