film_mlmodule / pipelines /deployment_pipeline.py
root
first hf commit
6defa3d
raw
history blame
No virus
7.27 kB
import json
# from .utils import get_data_for_test
import os
import numpy as np
import pandas as pd
#from materializer.custom_materializer import cs_materializer
from steps.data_retrieving import retrieve_data
from steps.data_preprocessing import preprocess_data
from steps.evaluation import evaluate_model
from steps.data_ingesting import ingest_data
from steps.model_train import train_model
from zenml import pipeline, step
from zenml.config import DockerSettings
from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT
from zenml.integrations.constants import MLFLOW, TENSORFLOW
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step
from zenml.steps import BaseParameters, Output
from .utils import get_data_for_test
docker_settings = DockerSettings(required_integrations=[MLFLOW])
import pandas as pd
# import os
# from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
# MLFlowModelDeployer,
# )
# from zenml.integrations.mlflow.services import MLFlowDeploymentService
# from zenml.pipelines import pipeline
# from zenml.steps import BaseParameters, Output, step
import os
current_directory = os.path.dirname(os.path.abspath(__file__))
parent_path = os.path.dirname(current_directory)
data_path_cur = os.path.join(parent_path, "data", "olist_customers_dataset.csv")
@step(enable_cache=False)
def dynamic_importer() -> str:
"""Downloads the latest data from a mock API."""
data = get_data_for_test()
return data
class DeploymentTriggerConfig(BaseParameters):
"""Parameters that are used to trigger the deployment"""
min_accuracy: float = 0.9
@step
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
) -> bool:
"""Implements a simple model deployment trigger that looks at the
input model accuracy and decides if it is good enough to deploy"""
return True
class MLFlowDeploymentLoaderStepParameters(BaseParameters):
"""MLflow deployment getter parameters
Attributes:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""
pipeline_name: str
step_name: str
running: bool = True
@step(enable_cache=False)
def prediction_service_loader(
pipeline_name: str,
pipeline_step_name: str,
running: bool = True,
model_name: str = "model",
) -> MLFlowDeploymentService:
"""Get the prediction service started by the deployment pipeline.
Args:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""
# get the MLflow model deployer stack component
model_deployer = MLFlowModelDeployer.get_active_model_deployer()
# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_name=model_name,
running=running,
)
if not existing_services:
raise RuntimeError(
f"No MLflow prediction service deployed by the "
f"{pipeline_step_name} step in the {pipeline_name} "
f"pipeline for the '{model_name}' model is currently "
f"running."
)
print(existing_services)
print(type(existing_services))
return existing_services[0]
@step
def predictor(
service: MLFlowDeploymentService,
data: np.ndarray,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
@step
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def continuous_deployment_pipeline(
min_accuracy: float = 0.9,
workers: int = 1,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
# Link all the steps artifacts together
df_movie, df_user, df_rating = retrieve_data()
df = ingest_data(data_path_cur)
x_train, x_test, y_train, y_test = preprocess_data(df)
model = train_model(x_train, x_test, y_train, y_test)
mse, rmse = evaluate_model(model, x_test, y_test)
deployment_decision = deployment_trigger(accuracy=mse)
mlflow_deployer = mlflow_model_deployer_step(
model=model,
deploy_decision=deployment_decision,
workers=workers,
timeout=timeout,
)
print(mlflow_deployer)
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name: str, running: str):
# Link all the steps artifacts together
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
running=running,
)
predictor(service=model_deployment_service, data=batch_data)