Working with Pipelines
In this module we will be working with pipelines. We will be creating a pipeline that will train a model and then deploy it to a serving runtime.
Develop a KFP pipeline to retrain a model
In the parasol-insurance repository, there is a python notebook that re-trains a model: 04-03-model-retraining.ipynb. We will be using this notebook to create a pipeline that will retrain the model.
-
In RHOAI, open the standard-workbench.
-
Create a new notebook and name it
model-retraining-pipeline
. -
Start the notebook with a cell to import the packages neccesary to work with
kfp
pipelines sdk:Solution
import kfp.compiler from kfp import dsl
-
Add a cell to define the kubeflow endpoint, and the image that will be used to create containers that will execute each task:
Solution
KUBEFLOW_ENDPOINT = 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443' PYTHON_IMAGE = 'image-registry.openshift-image-registry.svc:5000/openshift/python:latest'
-
Add a cell to define the task that will download the dataset (see the
retrieve_dataset
function defined at 04-03-model-retraining.ipynb)Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["requests", "zipp"], ) def download_data(dataset_type: str, datasets: dsl.Output[dsl.Dataset]): import requests import zipfile URL = f"https://rhods-public.s3.amazonaws.com/sample-data/accident-data/accident-{dataset_type}.zip" print("Downloading file...") response = requests.get(URL, stream=True) block_size = 1024 with open(f'./accident-{dataset_type}.zip', 'wb') as f: for data in response.iter_content(block_size): f.write(data) print("Unzipping file...") with zipfile.ZipFile(f'./accident-{dataset_type}.zip', 'r') as zip_ref: zip_ref.extractall(path=datasets.path) print("Done!")
Notice the output dataset parameter (
datasets: dsl.Output[dsl.Dataset]
), and how we use the path of such dataset to unzip the contents of the downloaded dataset. -
Create a cell to define the pipeline, and use the previously defined task.
Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data(dataset_type="sample")
-
Create a cell to connect to the
KUBEFLOW_ENDPOINT
, and create your pipeline run:Solution
print(f"Connecting to kfp: {KUBEFLOW_ENDPOINT}") import os bearer_token = "sha256~P0wEh46fxWa4uzPKR-b3fhcnsyXvCju4GovRd2YNNKM" sa_ca_cert = "/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" if os.path.isfile(sa_ca_cert) and "svc" in KUBEFLOW_ENDPOINT: ssl_ca_cert = sa_ca_cert else: ssl_ca_cert = None client = kfp.Client( host=KUBEFLOW_ENDPOINT, existing_token=bearer_token, ssl_ca_cert=ssl_ca_cert, ) result = client.create_run_from_pipeline_func( accident_detection_pipeline, arguments={}, experiment_name="accident-detection")
Provide your own token, you can find the token in the OpenShift Web Console by clicking on your username in the top right corner and selecting
Copy Login Command
. -
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to the pipeline runs of the parasol-insurance data science project: You should find a run with a name starting with accident-detection
-
Create a cell to train the model, organize this cell to appear before the cell that defines the pipeline (
@kfp.dsl.pipeline
). The contents of this cell were crafted after from a combination of functions from 04-03-model-retraining.ipynb; we recommend you to compare this cell with the original notebook contents:Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["ultralytics", "opencv-contrib-python-headless"], ) def train_model(datasets: dsl.Input[dsl.Dataset], model_onnx: dsl.Output[dsl.Model]): import os import shutil import datetime from ultralytics import YOLO print("setting the symlink for the datasets") os.symlink(datasets.path, "/opt/app-root/src/datasets") # Load model print("using a base model to start the training") model = YOLO('yolov8m.pt') # load a pretrained model (recommended for training) print("training the model") model.train(data=f'{datasets.path}/accident-sample/data.yaml', epochs=1, imgsz=640, batch=2) print("saving the file as onnx") # create runs/detect/train/weights/best.onnx YOLO("/opt/app-root/src/runs/detect/train/weights/best.pt").export(format="onnx") # save runs/detect/train/weights/best.onnx as {model_onnx.path}/accident-detection_{timestamp}.onnx timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") os.makedirs(model_onnx.path, exist_ok=True) shutil.copy('/opt/app-root/src/runs/detect/train/weights/best.onnx', f'{model_onnx.path}/accident-detection_{timestamp}.onnx')
Notice here the use of the
shutil
package to copy from the default output directory from the YOLO package to the output path of ourmodel_onnx
output parameter. -
Add the
train_model
task to the pipeline:Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data_task = download_data(dataset_type="sample") train_model(datasets=download_data_task.output)
Notice how we use the output of the
download_data
task as the input of thetrain_model
task. -
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to the Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
-
Create a cell to upload the model to s3 using the
boto3
package, organize this cell to appear before the cell that defines the pipeline (@kfp.dsl.pipeline
):Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["boto3"], ) def upload_to_s3(model_onnx: dsl.Input[dsl.Model]): import os import boto3 from botocore.client import Config print("configuring s3 instance") # Configuration minio_url = "http://minio.object-datastore.svc.cluster.local:9000" access_key = "minio" secret_key = "minio123" # Setting up the MinIO client s3 = boto3.client( 's3', endpoint_url=minio_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key, config=Config(signature_version='s3v4'), ) for (dirpath, dirnames, filenames) in os.walk(model_onnx.path): for file in filenames: print(f"uploading file {dirpath}/{file}") s3.upload_file(f"{dirpath}/{file}", "models", f"accident_model/{file}")
-
Add the
upload_to_s3
task to the pipeline.Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data_task = download_data(dataset_type="sample") train_model_task = train_model(datasets=download_data_task.output) upload_to_s3(model_onnx=train_model_task.outputs["model_onnx"])
-
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
-
Validate that the model is uploaded to the s3 bucket, by navigating to the s3 bucket in the minio console.
-
Validate the resulting script: The script should look like this: train-car-rekon.py
Create a pipeline to train a model
-
Create a
model-training-pipeline
directory in theparasol-insurance
tenants directory. -
Create the
base
andoverlays
directories in themodel-training-pipeline
directory. -
In the
base
directory, create akustomization.yaml
file with the following content:Solution
apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: parasol-insurance resources: - model-retrain-imagestream.yaml - model-retrain-pipeline.yaml - model-retrain-pipelinerun.yaml - execute-kfp-task.yaml
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yamlapiVersion: image.openshift.io/v1 kind: ImageStream metadata: name: model-retrain
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yamlapiVersion: tekton.dev/v1 kind: Pipeline metadata: name: model-retrain spec: params: - default: 'https://github.com/redhat-ai-services/ai-accelerator-bootcamp.git' description: Repo URL name: GIT_URL type: string - default: 'source_code/40_pipelines' description: Repo URL name: CONTEXT type: string - default: 'train-car-rekon.py' name: PIPELINE_SCRIPT type: string - default: main name: GIT_REVISION type: string - default: 3.11-ubi9 name: PYTHON_IMAGE type: string - default: 'image-registry.openshift-image-registry.svc:5000/parasol-insurance/model-retrain' name: TARGET_IMAGE type: string - default: 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443' name: KUBEFLOW_ENDPOINT type: string tasks: - name: git-clone params: - name: URL value: $(params.GIT_URL) - name: REVISION value: $(params.GIT_REVISION) taskRef: resolver: cluster params: - name: kind value: task - name: name value: git-clone - name: namespace value: openshift-pipelines workspaces: - name: output workspace: source - name: s2i-python params: - name: VERSION value: $(params.PYTHON_IMAGE) - name: CONTEXT value: $(params.CONTEXT) - name: IMAGE value: $(params.TARGET_IMAGE) runAfter: - git-clone taskRef: resolver: cluster params: - name: kind value: task - name: name value: s2i-python - name: namespace value: openshift-pipelines workspaces: - name: source workspace: source - name: execute-kubeflow-pipeline params: - name: IMAGE value: $(params.TARGET_IMAGE) - name: TAG value: latest - name: SCRIPT value: $(params.PIPELINE_SCRIPT) - name: KUBEFLOW_ENDPOINT value: $(params.KUBEFLOW_ENDPOINT) runAfter: - s2i-python taskRef: kind: Task name: execute-kubeflow-pipeline workspaces: - name: source
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yamlapiVersion: tekton.dev/v1 kind: PipelineRun metadata: name: model-retrain-init annotations: argocd.argoproj.io/sync-wave: "10" spec: pipelineRef: name: model-retrain taskRunTemplate: serviceAccountName: pipeline timeouts: pipeline: 1h0m0s workspaces: - name: source volumeClaimTemplate: spec: accessModes: - ReadWriteOnce resources: requests: storage: 1Gi volumeMode: Filesystem
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yamlapiVersion: tekton.dev/v1 kind: Task metadata: name: execute-kubeflow-pipeline spec: description: >- This task will execute a python script, triggering a kubeflow pipeline params: - name: IMAGE description: The image used to execute the script type: string - name: TAG description: The tag for the image type: string default: "latest" - name: SCRIPT description: The name of the script to be executed - name: KUBEFLOW_ENDPOINT description: The endpoint URL for Kubeflow default: "https://ds-pipeline-dspa:8443" steps: - name: execute-python image: $(inputs.params.IMAGE):$(inputs.params.TAG) env: - name: KUBEFLOW_ENDPOINT value: $(inputs.params.KUBEFLOW_ENDPOINT) script: | python $(inputs.params.SCRIPT)
-
In the
overlays
directory, create aparasol-insurance-dev
directory. -
In the
parasol-insurance-dev
directory, create akustomization.yaml
file.Solution
tenants/parasol-insurance/model-training-pipeline/overlays/parasol-insurance-dev/kustomization.yamlapiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - ../../base
-
Commit and push the changes to the Git repository.
-
Wait for ArgoCD to sync the changes.
-
Navigate to the OpenShift console, and validate that the
model-retrain
pipeline is available in theparasol-insurance
namespace. -
Click on the
model-retrain
pipeline, and validate that there is a pipeline run, wait the pipeline run to complete -
Navigate to the RHOAI dashboard → Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
Validate your code against Branch for model_retrain pipeline config
The
model-retrain-pipeline.yaml
refers previously mentioned train-car-recon.py, which uploads model file to S3 and expects that the bucket namedmodels
already exists. You can verify this in MinIO web interface and if necessary create missing bucket from there.
Experiments
A pipeline experiment is a workspace where you can try different configurations of your pipelines. You can use experiments to organize your runs into logical groups.
Experiments and Runs
-
Navigate to RHOAI Dasboard, click on Experiments > Experiments and Runs. Validate that new experiment
accident_detection
is created -
Click on the experiment
accident_detection
to view pipeline runs. -
Click on each pipeline run to view more details.
-
We can schedule periodic pipeline runs for an experiment. Click on 'Schedules'. Click on 'Create Schedule'. Please fill following details:
-
Experiment: We can choose an existing Experiment or create a new Experiment.
-
Name: Name for the schedule
-
Trigger Type: Periodic
-
Run Every: 1 hour
-
Start Date: Start date for the schedule
-
End Date: End Date for the schedule
-
Pipeline: Name of the pipeline
-
Pipeline version: Version of the pipeline
-
