Spaces:
Runtime error
Runtime error
import hopsworks | |
import pandas as pd | |
import os | |
from datetime import datetime, timedelta | |
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score | |
from sklearn.model_selection import train_test_split | |
import joblib | |
from pathlib import Path | |
import hsfs | |
import hsml | |
# Define the base directory as the project root | |
BASE_DIR = Path(__file__).resolve().parent.parent.parent | |
class Trainer: | |
def __init__(self, project_name, feature_group_name, model_registry_name, api_key): | |
self.project_name = project_name | |
self.feature_group_name = feature_group_name | |
self.model_registry_name = model_registry_name | |
self.api_key = api_key | |
self.project = hopsworks.login(api_key_value=self.api_key) | |
self.fs = self.project.get_feature_store() | |
self.model_registry = self.project.get_model_registry() | |
self.feature_view = None | |
self.deployment = None | |
def create_feature_view(self): | |
"""Select features from the feature group and create a feature view.""" | |
selected_features = self.fs.get_or_create_feature_group( | |
name=self.feature_group_name, | |
version=1 | |
).select_all() | |
print("Feature group selected successfully......... --->>") | |
"""Create or get a feature view for the last 30 days of data.""" | |
try: | |
self.feature_view = self.fs.get_or_create_feature_view( | |
name=f"{self.feature_group_name}_view", | |
version=1, | |
description="Feature view with last 30 days of data for model training", | |
query=selected_features, | |
) | |
print("Feature view created or retrieved successfully.") | |
except hsfs.client.exceptions.RestAPIError as e: | |
print(f"Error creating feature view: {e}") | |
def delete_feature_view(self): | |
"""Delete the feature view.""" | |
try: | |
self.feature_view.delete() | |
print("Feature view deleted successfully.") | |
except hsfs.client.exceptions.RestAPIError as e: | |
print(f"Error deleting feature view: {e}") | |
def get_retrain_data_from_feature_view(self): | |
"""Pull the last 30 days of data from the feature view till today.""" | |
start_time = datetime.now() - timedelta(days=30) | |
end_time = datetime.now() | |
# Get the data as a DataFrame from the feature view | |
df = self.feature_view.get_batch_data( | |
start_time=start_time, end_time=end_time) | |
# sort by datetime | |
df = df.sort_values(by='datetime', ascending=False) | |
print("Data pulled from feature view for retraining successfully.") | |
return df | |
def get_plot_data_from_feature_view(self, hours): | |
# get last 12 hours of data starting from current hour to plot | |
start_time = datetime.now() - timedelta(hours=hours) | |
end_time = datetime.now() | |
# Get the data as a DataFrame from the feature view | |
df = self.feature_view.get_batch_data( | |
start_time=start_time, end_time=end_time) | |
# sort by datetime | |
df = df.sort_values(by='datetime', ascending=False) | |
print("Data pulled from feature view for plotting successfully.") | |
return df | |
def train_test_split(self, df, test_size=0.2): | |
"""Split data into training and test sets.""" | |
# Define feature columns based on lagged features | |
feature_columns = [ | |
f"{prefix}_lag_{i}" for i in range(0, 13) for prefix in ["open", "high", "low", "close"] | |
] | |
# Separate features and target | |
X = df[feature_columns] | |
y = df['target'] | |
# Split into train and test sets | |
X_train, X_test, y_train, y_test = train_test_split( | |
X, y, test_size=test_size, random_state=42) | |
print("Data split into train and test sets.") | |
return X_train, X_test, y_train, y_test | |
def get_features_labels(self, df): | |
"""Split data into features and labels.""" | |
# Define feature columns based on lagged features | |
feature_columns = [ | |
f"{prefix}_lag_{i}" for i in range(0, 13) for prefix in ["open", "high", "low", "close"] | |
] | |
# Separate features and target | |
X = df[feature_columns] | |
y = df['target'] | |
return X, y | |
def train_model(self, model, X_train, y_train): | |
"""Train the model on training data.""" | |
model.fit(X_train, y_train) | |
print("Model training completed.") | |
return model | |
def evaluate_model(self, model, X_test, y_test, **kwargs): | |
"""Evaluate the model on the hold-out test set.""" | |
y_pred = model.predict(X_test) | |
# if show_pred in kwargs is true, print the predictions | |
if "show_pred" in kwargs: | |
print(f"Predictions: {y_pred}") | |
mse = mean_squared_error(y_test, y_pred) | |
mae = mean_absolute_error(y_test, y_pred) | |
r2 = r2_score(y_test, y_pred) | |
print(f"Model Evaluation:\nMSE: {mse}\nMAE: {mae}\nR2 Score: {r2}") | |
return {"mse": mse, "mae": mae, "r2": r2} | |
def save_model_to_registry(self, model, metrics, model_schema, X_train): | |
"""Save the trained model to Hopsworks Model Registry.""" | |
# Use BASE_DIR to define the model directory and path | |
model_dir = BASE_DIR / "models" | |
# Ensure the directory exists | |
if not model_dir.exists(): | |
model_dir.mkdir(parents=True, exist_ok=True) | |
model_path = model_dir / f"{self.model_registry_name}.pkl" | |
joblib.dump(model, model_path) | |
new_model = self.model_registry.sklearn.create_model( | |
name=self.model_registry_name, | |
metrics=metrics, | |
model_schema=model_schema, | |
input_example=X_train.sample(), | |
description="Trained model with 30-day feature view data", | |
) | |
# Register the model and serve as endpoint | |
new_model.save(str(model_path)) | |
# new_model.deploy() | |
print("Model saved to registry successfully.") | |
def model_deploy(self): | |
model = self.model_registry.get_model( | |
self.model_registry_name) | |
# strip all _ from self.model_registry_name and keep only alphanumeric characters | |
deploy_name = self.model_registry_name.replace("_", "") | |
# Get the dataset API for the project | |
dataset_api = self.project.get_dataset_api() | |
# Upload the file "predict_example.py" to the "Models" dataset | |
# If a file with the same name already exists, overwrite it | |
predictor_local_path = BASE_DIR / "src" / \ | |
"training_pipeline" / "kserve_predict_script.py" | |
uploaded_file_path = dataset_api.upload( | |
predictor_local_path, "Models", overwrite=True) | |
# Construct the full path to the uploaded predictor script | |
predictor_script_path = os.path.join( | |
"/Projects", self.project_name, uploaded_file_path) | |
self.deployment = model.deploy( | |
name=deploy_name, | |
script_file=predictor_script_path,) | |
# start the deployment | |
self.deployment.start() | |
def predict_with_hopsworks_api(self, X): | |
"""Use the deployed model to make predictions via the Hopsworks API.""" | |
# Get model serving handle from the project | |
model_serving = self.project.get_model_serving() | |
model = self.model_registry.get_model( | |
self.model_registry_name, version=1) | |
# Ensure the deployment name follows the required regex pattern | |
deploy_name = self.model_registry_name.replace("_", "") | |
try: | |
# Get the deployment | |
deployment = model_serving.get_deployment(name=deploy_name) | |
# Make predictions | |
predictions = deployment.predict(inputs=X.values.tolist()) | |
print("Predictions made via Hopsworks model API.") | |
return predictions | |
except hsml.client.exceptions.RestAPIError as e: | |
print(f"Error making predictions: {e}") | |
return None | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
return None | |
def stop_model_deployment(self): | |
model = self.model_registry.get_model( | |
self.model_registry_name, version=1) | |
# Ensure the deployment name follows the required regex pattern | |
deploy_name = self.model_registry_name.replace("_", "") | |
# Get model serving handle | |
model_serving = self.project.get_model_serving() | |
try: | |
# List deployments | |
deployments = model_serving.get_deployments(model) | |
for deployment in deployments: | |
if deployment.name == deploy_name: | |
# deployment.stop() | |
deployment.delete(force=True) | |
print( | |
f"Deployment {deploy_name} stopped and deleted successfully.") | |
break | |
else: | |
print(f"No deployment found with name: {deploy_name}") | |
except hsml.client.exceptions.RestAPIError as e: | |
print(f"Error stopping or deleting deployment: {e}") | |
return model | |