from typing import cast import click from pipelines.deployment_pipeline import ( continuous_deployment_pipeline, inference_pipeline, ) from rich import print from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer, ) from zenml.integrations.mlflow.services import MLFlowDeploymentService DEPLOY = "deploy" PREDICT = "predict" DEPLOY_AND_PREDICT = "deploy_and_predict" @click.command() @click.option( "--config", "-c", type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]), default=DEPLOY_AND_PREDICT, help="Optionally you can choose to only run the deployment " "pipeline to train and deploy a model (`deploy`), or to " "only run a prediction against the deployed model " "(`predict`). By default both will be run " "(`deploy_and_predict`).", ) @click.option( "--min-accuracy", default=0.92, help="Minimum accuracy required to deploy the model", ) def main(config: str, min_accuracy: float): """Run the MLflow example pipeline.""" # get the MLflow model deployer stack component mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer() print(mlflow_model_deployer_component) deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT predict = config == PREDICT or config == DEPLOY_AND_PREDICT if deploy: # Initialize a continuous deployment pipeline run continuous_deployment_pipeline( min_accuracy=min_accuracy, workers=3, timeout=60, ) if predict: # Initialize an inference pipeline run inference_pipeline( pipeline_name="continuous_deployment_pipeline", pipeline_step_name="mlflow_model_deployer_step", running=False ) print( "You can run:\n " f"[italic green] mlflow ui --backend-store-uri '{get_tracking_uri()}" "[/italic green]\n ...to inspect your experiment runs within the MLflow" " UI.\nYou can find your runs tracked within the " "`mlflow_example_pipeline` experiment. There you'll also be able to " "compare two or more runs.\n\n" ) # fetch existing services with same pipeline name, step name and model name existing_services = mlflow_model_deployer_component.find_model_server( pipeline_name="continuous_deployment_pipeline", pipeline_step_name="mlflow_model_deployer_step", model_name="model", ) if existing_services: service = cast(MLFlowDeploymentService, existing_services[0]) if service.is_running: print( f"The MLflow prediction server is running locally as a daemon " f"process service and accepts inference requests at:\n" f" {service.prediction_url}\n" f"To stop the service, run " f"[italic green]`zenml model-deployer models delete " f"{str(service.uuid)}`[/italic green]." ) elif service.is_failed: print( f"The MLflow prediction server is in a failed state:\n" f" Last state: '{service.status.state.value}'\n" f" Last error: '{service.status.last_error}'" ) else: print( "No MLflow prediction server is currently running. The deployment " "pipeline must run first to train a model and deploy it. Execute " "the same command with the `--deploy` argument to deploy a model." ) if __name__ == "__main__": main()