import json # from .utils import get_data_for_test import os import numpy as np import pandas as pd #from materializer.custom_materializer import cs_materializer from steps.data_retrieving import retrieve_data from steps.data_preprocessing import preprocess_data from steps.evaluation import evaluate_model from steps.data_ingesting import ingest_data from steps.model_train import train_model from zenml import pipeline, step from zenml.config import DockerSettings from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT from zenml.integrations.constants import MLFLOW, TENSORFLOW from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer, ) from zenml.integrations.mlflow.services import MLFlowDeploymentService from zenml.integrations.mlflow.steps import mlflow_model_deployer_step from zenml.steps import BaseParameters, Output from .utils import get_data_for_test docker_settings = DockerSettings(required_integrations=[MLFLOW]) import pandas as pd # import os # from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( # MLFlowModelDeployer, # ) # from zenml.integrations.mlflow.services import MLFlowDeploymentService # from zenml.pipelines import pipeline # from zenml.steps import BaseParameters, Output, step import os current_directory = os.path.dirname(os.path.abspath(__file__)) parent_path = os.path.dirname(current_directory) data_path_cur = os.path.join(parent_path, "data", "olist_customers_dataset.csv") @step(enable_cache=False) def dynamic_importer() -> str: """Downloads the latest data from a mock API.""" data = get_data_for_test() return data class DeploymentTriggerConfig(BaseParameters): """Parameters that are used to trigger the deployment""" min_accuracy: float = 0.9 @step def deployment_trigger( accuracy: float, config: DeploymentTriggerConfig, ) -> bool: """Implements a simple model deployment trigger that looks at the input model accuracy and decides if it is good enough to deploy""" return True class MLFlowDeploymentLoaderStepParameters(BaseParameters): """MLflow deployment getter parameters Attributes: pipeline_name: name of the pipeline that deployed the MLflow prediction server step_name: the name of the step that deployed the MLflow prediction server running: when this flag is set, the step only returns a running service model_name: the name of the model that is deployed """ pipeline_name: str step_name: str running: bool = True @step(enable_cache=False) def prediction_service_loader( pipeline_name: str, pipeline_step_name: str, running: bool = True, model_name: str = "model", ) -> MLFlowDeploymentService: """Get the prediction service started by the deployment pipeline. Args: pipeline_name: name of the pipeline that deployed the MLflow prediction server step_name: the name of the step that deployed the MLflow prediction server running: when this flag is set, the step only returns a running service model_name: the name of the model that is deployed """ # get the MLflow model deployer stack component model_deployer = MLFlowModelDeployer.get_active_model_deployer() # fetch existing services with same pipeline name, step name and model name existing_services = model_deployer.find_model_server( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, model_name=model_name, running=running, ) if not existing_services: raise RuntimeError( f"No MLflow prediction service deployed by the " f"{pipeline_step_name} step in the {pipeline_name} " f"pipeline for the '{model_name}' model is currently " f"running." ) print(existing_services) print(type(existing_services)) return existing_services[0] @step def predictor( service: MLFlowDeploymentService, data: np.ndarray, ) -> np.ndarray: """Run an inference request against a prediction service""" service.start(timeout=10) # should be a NOP if already started data = json.loads(data) data.pop("columns") data.pop("index") columns_for_df = [ "payment_sequential", "payment_installments", "payment_value", "price", "freight_value", "product_name_lenght", "product_description_lenght", "product_photos_qty", "product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm", ] df = pd.DataFrame(data["data"], columns=columns_for_df) json_list = json.loads(json.dumps(list(df.T.to_dict().values()))) data = np.array(json_list) prediction = service.predict(data) return prediction @step def predictor( service: MLFlowDeploymentService, data: str, ) -> np.ndarray: """Run an inference request against a prediction service""" service.start(timeout=10) # should be a NOP if already started data = json.loads(data) data.pop("columns") data.pop("index") columns_for_df = [ "payment_sequential", "payment_installments", "payment_value", "price", "freight_value", "product_name_lenght", "product_description_lenght", "product_photos_qty", "product_weight_g", "product_length_cm", "product_height_cm", "product_width_cm", ] df = pd.DataFrame(data["data"], columns=columns_for_df) json_list = json.loads(json.dumps(list(df.T.to_dict().values()))) data = np.array(json_list) prediction = service.predict(data) return prediction @pipeline(enable_cache=False, settings={"docker": docker_settings}) def continuous_deployment_pipeline( min_accuracy: float = 0.9, workers: int = 1, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, ): # Link all the steps artifacts together df_movie, df_user, df_rating = retrieve_data() df = ingest_data(data_path_cur) x_train, x_test, y_train, y_test = preprocess_data(df) model = train_model(x_train, x_test, y_train, y_test) mse, rmse = evaluate_model(model, x_test, y_test) deployment_decision = deployment_trigger(accuracy=mse) mlflow_deployer = mlflow_model_deployer_step( model=model, deploy_decision=deployment_decision, workers=workers, timeout=timeout, ) print(mlflow_deployer) @pipeline(enable_cache=False, settings={"docker": docker_settings}) def inference_pipeline(pipeline_name: str, pipeline_step_name: str, running: str): # Link all the steps artifacts together batch_data = dynamic_importer() model_deployment_service = prediction_service_loader( pipeline_name=pipeline_name, pipeline_step_name=pipeline_step_name, running=running, ) predictor(service=model_deployment_service, data=batch_data)