Spaces:
Runtime error
Runtime error
import json | |
# from .utils import get_data_for_test | |
import os | |
import numpy as np | |
import pandas as pd | |
#from materializer.custom_materializer import cs_materializer | |
from steps.data_retrieving import retrieve_data | |
from steps.data_preprocessing import preprocess_data | |
from steps.evaluation import evaluate_model | |
from steps.data_ingesting import ingest_data | |
from steps.model_train import train_model | |
from zenml import pipeline, step | |
from zenml.config import DockerSettings | |
from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT | |
from zenml.integrations.constants import MLFLOW, TENSORFLOW | |
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( | |
MLFlowModelDeployer, | |
) | |
from zenml.integrations.mlflow.services import MLFlowDeploymentService | |
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step | |
from zenml.steps import BaseParameters, Output | |
from .utils import get_data_for_test | |
docker_settings = DockerSettings(required_integrations=[MLFLOW]) | |
import pandas as pd | |
# import os | |
# from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( | |
# MLFlowModelDeployer, | |
# ) | |
# from zenml.integrations.mlflow.services import MLFlowDeploymentService | |
# from zenml.pipelines import pipeline | |
# from zenml.steps import BaseParameters, Output, step | |
import os | |
current_directory = os.path.dirname(os.path.abspath(__file__)) | |
parent_path = os.path.dirname(current_directory) | |
data_path_cur = os.path.join(parent_path, "data", "olist_customers_dataset.csv") | |
def dynamic_importer() -> str: | |
"""Downloads the latest data from a mock API.""" | |
data = get_data_for_test() | |
return data | |
class DeploymentTriggerConfig(BaseParameters): | |
"""Parameters that are used to trigger the deployment""" | |
min_accuracy: float = 0.9 | |
def deployment_trigger( | |
accuracy: float, | |
config: DeploymentTriggerConfig, | |
) -> bool: | |
"""Implements a simple model deployment trigger that looks at the | |
input model accuracy and decides if it is good enough to deploy""" | |
return True | |
class MLFlowDeploymentLoaderStepParameters(BaseParameters): | |
"""MLflow deployment getter parameters | |
Attributes: | |
pipeline_name: name of the pipeline that deployed the MLflow prediction | |
server | |
step_name: the name of the step that deployed the MLflow prediction | |
server | |
running: when this flag is set, the step only returns a running service | |
model_name: the name of the model that is deployed | |
""" | |
pipeline_name: str | |
step_name: str | |
running: bool = True | |
def prediction_service_loader( | |
pipeline_name: str, | |
pipeline_step_name: str, | |
running: bool = True, | |
model_name: str = "model", | |
) -> MLFlowDeploymentService: | |
"""Get the prediction service started by the deployment pipeline. | |
Args: | |
pipeline_name: name of the pipeline that deployed the MLflow prediction | |
server | |
step_name: the name of the step that deployed the MLflow prediction | |
server | |
running: when this flag is set, the step only returns a running service | |
model_name: the name of the model that is deployed | |
""" | |
# get the MLflow model deployer stack component | |
model_deployer = MLFlowModelDeployer.get_active_model_deployer() | |
# fetch existing services with same pipeline name, step name and model name | |
existing_services = model_deployer.find_model_server( | |
pipeline_name=pipeline_name, | |
pipeline_step_name=pipeline_step_name, | |
model_name=model_name, | |
running=running, | |
) | |
if not existing_services: | |
raise RuntimeError( | |
f"No MLflow prediction service deployed by the " | |
f"{pipeline_step_name} step in the {pipeline_name} " | |
f"pipeline for the '{model_name}' model is currently " | |
f"running." | |
) | |
print(existing_services) | |
print(type(existing_services)) | |
return existing_services[0] | |
def predictor( | |
service: MLFlowDeploymentService, | |
data: np.ndarray, | |
) -> np.ndarray: | |
"""Run an inference request against a prediction service""" | |
service.start(timeout=10) # should be a NOP if already started | |
data = json.loads(data) | |
data.pop("columns") | |
data.pop("index") | |
columns_for_df = [ | |
"payment_sequential", | |
"payment_installments", | |
"payment_value", | |
"price", | |
"freight_value", | |
"product_name_lenght", | |
"product_description_lenght", | |
"product_photos_qty", | |
"product_weight_g", | |
"product_length_cm", | |
"product_height_cm", | |
"product_width_cm", | |
] | |
df = pd.DataFrame(data["data"], columns=columns_for_df) | |
json_list = json.loads(json.dumps(list(df.T.to_dict().values()))) | |
data = np.array(json_list) | |
prediction = service.predict(data) | |
return prediction | |
def predictor( | |
service: MLFlowDeploymentService, | |
data: str, | |
) -> np.ndarray: | |
"""Run an inference request against a prediction service""" | |
service.start(timeout=10) # should be a NOP if already started | |
data = json.loads(data) | |
data.pop("columns") | |
data.pop("index") | |
columns_for_df = [ | |
"payment_sequential", | |
"payment_installments", | |
"payment_value", | |
"price", | |
"freight_value", | |
"product_name_lenght", | |
"product_description_lenght", | |
"product_photos_qty", | |
"product_weight_g", | |
"product_length_cm", | |
"product_height_cm", | |
"product_width_cm", | |
] | |
df = pd.DataFrame(data["data"], columns=columns_for_df) | |
json_list = json.loads(json.dumps(list(df.T.to_dict().values()))) | |
data = np.array(json_list) | |
prediction = service.predict(data) | |
return prediction | |
def continuous_deployment_pipeline( | |
min_accuracy: float = 0.9, | |
workers: int = 1, | |
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, | |
): | |
# Link all the steps artifacts together | |
df_movie, df_user, df_rating = retrieve_data() | |
df = ingest_data(data_path_cur) | |
x_train, x_test, y_train, y_test = preprocess_data(df) | |
model = train_model(x_train, x_test, y_train, y_test) | |
mse, rmse = evaluate_model(model, x_test, y_test) | |
deployment_decision = deployment_trigger(accuracy=mse) | |
mlflow_deployer = mlflow_model_deployer_step( | |
model=model, | |
deploy_decision=deployment_decision, | |
workers=workers, | |
timeout=timeout, | |
) | |
print(mlflow_deployer) | |
def inference_pipeline(pipeline_name: str, pipeline_step_name: str, running: str): | |
# Link all the steps artifacts together | |
batch_data = dynamic_importer() | |
model_deployment_service = prediction_service_loader( | |
pipeline_name=pipeline_name, | |
pipeline_step_name=pipeline_step_name, | |
running=running, | |
) | |
predictor(service=model_deployment_service, data=batch_data) | |