Spaces:
Runtime error
Runtime error
File size: 7,273 Bytes
6defa3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
import json
# from .utils import get_data_for_test
import os
import numpy as np
import pandas as pd
#from materializer.custom_materializer import cs_materializer
from steps.data_retrieving import retrieve_data
from steps.data_preprocessing import preprocess_data
from steps.evaluation import evaluate_model
from steps.data_ingesting import ingest_data
from steps.model_train import train_model
from zenml import pipeline, step
from zenml.config import DockerSettings
from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT
from zenml.integrations.constants import MLFLOW, TENSORFLOW
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step
from zenml.steps import BaseParameters, Output
from .utils import get_data_for_test
docker_settings = DockerSettings(required_integrations=[MLFLOW])
import pandas as pd
# import os
# from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
# MLFlowModelDeployer,
# )
# from zenml.integrations.mlflow.services import MLFlowDeploymentService
# from zenml.pipelines import pipeline
# from zenml.steps import BaseParameters, Output, step
import os
current_directory = os.path.dirname(os.path.abspath(__file__))
parent_path = os.path.dirname(current_directory)
data_path_cur = os.path.join(parent_path, "data", "olist_customers_dataset.csv")
@step(enable_cache=False)
def dynamic_importer() -> str:
"""Downloads the latest data from a mock API."""
data = get_data_for_test()
return data
class DeploymentTriggerConfig(BaseParameters):
"""Parameters that are used to trigger the deployment"""
min_accuracy: float = 0.9
@step
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
) -> bool:
"""Implements a simple model deployment trigger that looks at the
input model accuracy and decides if it is good enough to deploy"""
return True
class MLFlowDeploymentLoaderStepParameters(BaseParameters):
"""MLflow deployment getter parameters
Attributes:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""
pipeline_name: str
step_name: str
running: bool = True
@step(enable_cache=False)
def prediction_service_loader(
pipeline_name: str,
pipeline_step_name: str,
running: bool = True,
model_name: str = "model",
) -> MLFlowDeploymentService:
"""Get the prediction service started by the deployment pipeline.
Args:
pipeline_name: name of the pipeline that deployed the MLflow prediction
server
step_name: the name of the step that deployed the MLflow prediction
server
running: when this flag is set, the step only returns a running service
model_name: the name of the model that is deployed
"""
# get the MLflow model deployer stack component
model_deployer = MLFlowModelDeployer.get_active_model_deployer()
# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_name=model_name,
running=running,
)
if not existing_services:
raise RuntimeError(
f"No MLflow prediction service deployed by the "
f"{pipeline_step_name} step in the {pipeline_name} "
f"pipeline for the '{model_name}' model is currently "
f"running."
)
print(existing_services)
print(type(existing_services))
return existing_services[0]
@step
def predictor(
service: MLFlowDeploymentService,
data: np.ndarray,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
@step
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=10) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
"payment_sequential",
"payment_installments",
"payment_value",
"price",
"freight_value",
"product_name_lenght",
"product_description_lenght",
"product_photos_qty",
"product_weight_g",
"product_length_cm",
"product_height_cm",
"product_width_cm",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def continuous_deployment_pipeline(
min_accuracy: float = 0.9,
workers: int = 1,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
# Link all the steps artifacts together
df_movie, df_user, df_rating = retrieve_data()
df = ingest_data(data_path_cur)
x_train, x_test, y_train, y_test = preprocess_data(df)
model = train_model(x_train, x_test, y_train, y_test)
mse, rmse = evaluate_model(model, x_test, y_test)
deployment_decision = deployment_trigger(accuracy=mse)
mlflow_deployer = mlflow_model_deployer_step(
model=model,
deploy_decision=deployment_decision,
workers=workers,
timeout=timeout,
)
print(mlflow_deployer)
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name: str, running: str):
# Link all the steps artifacts together
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
running=running,
)
predictor(service=model_deployment_service, data=batch_data)
|