|
|
""" |
|
|
Module pour les IA de prédiction linéaire |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import List, Tuple, Optional |
|
|
from .neuron import SitiNEUR |
|
|
|
|
|
|
|
|
class LinearAI: |
|
|
""" |
|
|
IA de prédiction linéaire avec réseau de neurones |
|
|
|
|
|
Args: |
|
|
input_size: Nombre de features en entrée |
|
|
output_size: Nombre de sorties (1 pour régression simple) |
|
|
hidden_layers: Liste des tailles des couches cachées |
|
|
|
|
|
Example: |
|
|
>>> from sitiai import create |
|
|
>>> ai = create.ai('linear', input_size=3, output_size=1, hidden_layers=[10, 5]) |
|
|
>>> ai.train(X_train, y_train, epochs=100) |
|
|
>>> predictions = ai.predict(X_test) |
|
|
""" |
|
|
|
|
|
def __init__(self, input_size: int, output_size: int = 1, hidden_layers: Optional[List[int]] = None): |
|
|
self.input_size = input_size |
|
|
self.output_size = output_size |
|
|
self.hidden_layers = hidden_layers or [10] |
|
|
|
|
|
|
|
|
self.layers: List[SitiNEUR] = [] |
|
|
|
|
|
|
|
|
layer_sizes = [input_size] + self.hidden_layers + [output_size] |
|
|
|
|
|
for i in range(len(layer_sizes) - 1): |
|
|
|
|
|
activation = 'relu' if i < len(layer_sizes) - 2 else 'linear' |
|
|
layer = SitiNEUR( |
|
|
input_size=layer_sizes[i], |
|
|
output_size=layer_sizes[i + 1], |
|
|
activation=activation |
|
|
) |
|
|
self.layers.append(layer) |
|
|
|
|
|
self.is_trained = False |
|
|
self.loss_history = [] |
|
|
|
|
|
|
|
|
self.x_mean = None |
|
|
self.x_std = None |
|
|
self.y_mean = None |
|
|
self.y_std = None |
|
|
|
|
|
def _normalize_X(self, X: np.ndarray, fit: bool = False) -> np.ndarray: |
|
|
"""Normalise les entrées""" |
|
|
if fit: |
|
|
self.x_mean = np.mean(X, axis=0) |
|
|
self.x_std = np.std(X, axis=0) + 1e-8 |
|
|
|
|
|
return (X - self.x_mean) / self.x_std |
|
|
|
|
|
def _normalize_y(self, y: np.ndarray, fit: bool = False) -> np.ndarray: |
|
|
"""Normalise les sorties""" |
|
|
if fit: |
|
|
self.y_mean = np.mean(y, axis=0) |
|
|
self.y_std = np.std(y, axis=0) + 1e-8 |
|
|
|
|
|
return (y - self.y_mean) / self.y_std |
|
|
|
|
|
def _denormalize_y(self, y: np.ndarray) -> np.ndarray: |
|
|
"""Dénormalise les prédictions""" |
|
|
if self.y_mean is not None and self.y_std is not None: |
|
|
return y * self.y_std + self.y_mean |
|
|
return y |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Propagation avant à travers tout le réseau |
|
|
|
|
|
Args: |
|
|
x: Données d'entrée |
|
|
|
|
|
Returns: |
|
|
Prédictions |
|
|
""" |
|
|
output = x |
|
|
for layer in self.layers: |
|
|
output = layer.forward(output) |
|
|
return output |
|
|
|
|
|
def train(self, X: np.ndarray, y: np.ndarray, epochs: int = 100, |
|
|
learning_rate: float = 0.01, batch_size: Optional[int] = None, |
|
|
verbose: bool = True): |
|
|
""" |
|
|
Entraîne le modèle sur les données |
|
|
|
|
|
Args: |
|
|
X: Données d'entrée (shape: [n_samples, input_size]) |
|
|
y: Cibles (shape: [n_samples, output_size]) |
|
|
epochs: Nombre d'époques d'entraînement |
|
|
learning_rate: Taux d'apprentissage |
|
|
batch_size: Taille des mini-batches (None = batch complet) |
|
|
verbose: Afficher les logs d'entraînement |
|
|
""" |
|
|
|
|
|
X = np.array(X, dtype=np.float64) |
|
|
y = np.array(y, dtype=np.float64) |
|
|
|
|
|
|
|
|
if y.ndim == 1: |
|
|
y = y.reshape(-1, 1) |
|
|
|
|
|
|
|
|
if np.any(np.isnan(X)) or np.any(np.isnan(y)): |
|
|
raise ValueError("Les données contiennent des valeurs NaN") |
|
|
|
|
|
|
|
|
X_norm = self._normalize_X(X, fit=True) |
|
|
y_norm = self._normalize_y(y, fit=True) |
|
|
|
|
|
n_samples = X_norm.shape[0] |
|
|
batch_size = batch_size or min(32, n_samples) |
|
|
|
|
|
self.loss_history = [] |
|
|
|
|
|
for epoch in range(epochs): |
|
|
|
|
|
indices = np.random.permutation(n_samples) |
|
|
X_shuffled = X_norm[indices] |
|
|
y_shuffled = y_norm[indices] |
|
|
|
|
|
epoch_loss = 0 |
|
|
n_batches = 0 |
|
|
|
|
|
|
|
|
for i in range(0, n_samples, batch_size): |
|
|
batch_X = X_shuffled[i:i + batch_size] |
|
|
batch_y = y_shuffled[i:i + batch_size] |
|
|
|
|
|
|
|
|
predictions = self.forward(batch_X) |
|
|
|
|
|
|
|
|
if np.any(np.isnan(predictions)): |
|
|
if verbose: |
|
|
print(f"⚠️ NaN détecté à l'époque {epoch + 1}, arrêt de l'entraînement") |
|
|
break |
|
|
|
|
|
|
|
|
loss = np.mean((predictions - batch_y) ** 2) |
|
|
epoch_loss += loss |
|
|
n_batches += 1 |
|
|
|
|
|
|
|
|
grad = 2 * (predictions - batch_y) / batch_y.shape[0] |
|
|
|
|
|
for layer in reversed(self.layers): |
|
|
grad = layer.backward(grad, learning_rate) |
|
|
|
|
|
if n_batches == 0: |
|
|
break |
|
|
|
|
|
avg_loss = epoch_loss / n_batches |
|
|
self.loss_history.append(avg_loss) |
|
|
|
|
|
if verbose and (epoch + 1) % 10 == 0: |
|
|
print(f"Époque {epoch + 1}/{epochs}, Perte: {avg_loss:.6f}") |
|
|
|
|
|
self.is_trained = True |
|
|
|
|
|
if verbose: |
|
|
print(f"\n✓ Entraînement terminé! Perte finale: {self.loss_history[-1]:.6f}") |
|
|
|
|
|
def predict(self, X: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Fait des prédictions sur de nouvelles données |
|
|
|
|
|
Args: |
|
|
X: Données d'entrée |
|
|
|
|
|
Returns: |
|
|
Prédictions |
|
|
""" |
|
|
if not self.is_trained: |
|
|
print("⚠️ Attention: Le modèle n'est pas entraîné.") |
|
|
|
|
|
X = np.array(X, dtype=np.float64) |
|
|
|
|
|
|
|
|
if self.x_mean is not None: |
|
|
X_norm = self._normalize_X(X, fit=False) |
|
|
predictions_norm = self.forward(X_norm) |
|
|
return self._denormalize_y(predictions_norm) |
|
|
|
|
|
return self.forward(X) |
|
|
|
|
|
def evaluate(self, X: np.ndarray, y: np.ndarray) -> Tuple[float, float]: |
|
|
""" |
|
|
Évalue le modèle sur des données de test |
|
|
|
|
|
Args: |
|
|
X: Données d'entrée |
|
|
y: Vraies valeurs |
|
|
|
|
|
Returns: |
|
|
(mse, r2_score) |
|
|
""" |
|
|
predictions = self.predict(X) |
|
|
y = np.array(y, dtype=np.float32) |
|
|
|
|
|
if y.ndim == 1: |
|
|
y = y.reshape(-1, 1) |
|
|
|
|
|
|
|
|
mse = np.mean((predictions - y) ** 2) |
|
|
|
|
|
|
|
|
ss_res = np.sum((y - predictions) ** 2) |
|
|
ss_tot = np.sum((y - np.mean(y)) ** 2) |
|
|
r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0 |
|
|
|
|
|
return mse, r2 |
|
|
|
|
|
def save_weights(self, filepath: str): |
|
|
""" |
|
|
Sauvegarde les poids du modèle dans un fichier |
|
|
|
|
|
Args: |
|
|
filepath: Chemin du fichier de sauvegarde (.npz) |
|
|
|
|
|
Example: |
|
|
>>> ai.save_weights('my_model.npz') |
|
|
""" |
|
|
if not filepath.endswith('.npz'): |
|
|
filepath += '.npz' |
|
|
|
|
|
weights_data = {} |
|
|
|
|
|
|
|
|
for i, layer in enumerate(self.layers): |
|
|
layer_weights = layer.get_weights() |
|
|
weights_data[f'layer_{i}_weights'] = layer_weights['weights'] |
|
|
weights_data[f'layer_{i}_bias'] = layer_weights['bias'] |
|
|
|
|
|
|
|
|
if self.x_mean is not None: |
|
|
weights_data['x_mean'] = self.x_mean |
|
|
weights_data['x_std'] = self.x_std |
|
|
weights_data['y_mean'] = self.y_mean |
|
|
weights_data['y_std'] = self.y_std |
|
|
|
|
|
|
|
|
weights_data['input_size'] = np.array([self.input_size]) |
|
|
weights_data['output_size'] = np.array([self.output_size]) |
|
|
weights_data['hidden_layers'] = np.array(self.hidden_layers) |
|
|
weights_data['is_trained'] = np.array([self.is_trained]) |
|
|
|
|
|
np.savez(filepath, **weights_data) |
|
|
print(f"✓ Modèle sauvegardé dans '{filepath}'") |
|
|
|
|
|
def load_weights(self, filepath: str): |
|
|
""" |
|
|
Charge les poids du modèle depuis un fichier |
|
|
|
|
|
Args: |
|
|
filepath: Chemin du fichier de sauvegarde (.npz) |
|
|
|
|
|
Example: |
|
|
>>> ai = sitiai.create.ai('linear', input_size=3, output_size=1) |
|
|
>>> ai.load_weights('my_model.npz') |
|
|
""" |
|
|
if not filepath.endswith('.npz'): |
|
|
filepath += '.npz' |
|
|
|
|
|
data = np.load(filepath, allow_pickle=True) |
|
|
|
|
|
|
|
|
for i, layer in enumerate(self.layers): |
|
|
weights_dict = { |
|
|
'weights': data[f'layer_{i}_weights'], |
|
|
'bias': data[f'layer_{i}_bias'] |
|
|
} |
|
|
layer.set_weights(weights_dict) |
|
|
|
|
|
|
|
|
if 'x_mean' in data: |
|
|
self.x_mean = data['x_mean'] |
|
|
self.x_std = data['x_std'] |
|
|
self.y_mean = data['y_mean'] |
|
|
self.y_std = data['y_std'] |
|
|
|
|
|
|
|
|
if 'is_trained' in data: |
|
|
self.is_trained = bool(data['is_trained'][0]) |
|
|
|
|
|
print(f"✓ Modèle chargé depuis '{filepath}'") |
|
|
|
|
|
def __repr__(self): |
|
|
status = "entraîné" if self.is_trained else "non entraîné" |
|
|
return f"LinearAI(architecture={[self.input_size] + self.hidden_layers + [self.output_size]}, status='{status}')" |
|
|
|