Working with Pipelines
In this module we will be working with pipelines. We will be creating a pipeline that will train a model and then deploy it to a serving runtime.
Develop a KFP pipeline to retrain a model
In the parasol-insurance repository, there is a python notebook that re-trains a model: 04-03-model-retraining.ipynb. We will be using this notebook to create a pipeline that will retrain the model.
-
In RHOAI, open the standard-workbench.
-
Create a new notebook and name it
model-retraining-pipeline
. -
Start the notebook with a cell to import the packages neccesary to work with
kfp
pipelines sdk:Solution
import kfp.compiler from kfp import dsl
-
Add a cell to define the kubeflow endpoint, and the image that will be used to create containers that will execute each task:
Solution
KUBEFLOW_ENDPOINT = 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443' PYTHON_IMAGE = 'image-registry.openshift-image-registry.svc:5000/openshift/python:latest'
-
Add a cell to define the task that will download the dataset (see the
retrieve_dataset
function defined at 04-03-model-retraining.ipynb)Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["requests", "zipp"], ) def download_data(dataset_type: str, datasets: dsl.Output[dsl.Dataset]): import requests import zipfile URL = f"https://rhods-public.s3.amazonaws.com/sample-data/accident-data/accident-{dataset_type}.zip" print("Downloading file...") response = requests.get(URL, stream=True) block_size = 1024 with open(f'./accident-{dataset_type}.zip', 'wb') as f: for data in response.iter_content(block_size): f.write(data) print("Unzipping file...") with zipfile.ZipFile(f'./accident-{dataset_type}.zip', 'r') as zip_ref: zip_ref.extractall(path=datasets.path) print("Done!")
Notice the output dataset parameter (
datasets: dsl.Output[dsl.Dataset]
), and how we use the path of such dataset to unzip the contents of the downloaded dataset. -
Create a cell to define the pipeline, and use the previously defined task.
Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data(dataset_type="sample")
-
Create a cell to connect to the
KUBEFLOW_ENDPOINT
, and create your pipeline run:Solution
print(f"Connecting to kfp: {KUBEFLOW_ENDPOINT}") import os bearer_token = "sha256~P0wEh46fxWa4uzPKR-b3fhcnsyXvCju4GovRd2YNNKM" sa_ca_cert = "/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" if os.path.isfile(sa_ca_cert) and "svc" in KUBEFLOW_ENDPOINT: ssl_ca_cert = sa_ca_cert else: ssl_ca_cert = None client = kfp.Client( host=KUBEFLOW_ENDPOINT, existing_token=bearer_token, ssl_ca_cert=ssl_ca_cert, ) result = client.create_run_from_pipeline_func( accident_detection_pipeline, arguments={}, experiment_name="accident-detection")
Provide your own token, you can find the token in the OpenShift Web Console by clicking on your username in the top right corner and selecting
Copy Login Command
. -
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to the pipeline runs of the parasol-insurance data science project: You should find a run with a name starting with accident-detection
-
Create a cell to train the model, organize this cell to appear before the cell that defines the pipeline (
@kfp.dsl.pipeline
). The contents of this cell were crafted after from a combination of functions from 04-03-model-retraining.ipynb; we recommend you to compare this cell with the original notebook contents:Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["ultralytics", "opencv-contrib-python-headless"], ) def train_model(datasets: dsl.Input[dsl.Dataset], model_onnx: dsl.Output[dsl.Model]): import os import shutil import datetime from ultralytics import YOLO print("setting the symlink for the datasets") os.symlink(datasets.path, "/opt/app-root/src/datasets") # Load model print("using a base model to start the training") model = YOLO('yolov8m.pt') # load a pretrained model (recommended for training) print("training the model") model.train(data=f'{datasets.path}/accident-sample/data.yaml', epochs=1, imgsz=640, batch=2) print("saving the file as onnx") # create runs/detect/train/weights/best.onnx YOLO("/opt/app-root/src/runs/detect/train/weights/best.pt").export(format="onnx") # save runs/detect/train/weights/best.onnx as {model_onnx.path}/accident-detection_{timestamp}.onnx timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") os.makedirs(model_onnx.path, exist_ok=True) shutil.copy('/opt/app-root/src/runs/detect/train/weights/best.onnx', f'{model_onnx.path}/accident-detection_{timestamp}.onnx')
Notice here the use of the |
-
Add the
train_model
task to the pipeline:Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data_task = download_data(dataset_type="sample") train_model(datasets=download_data_task.output)
Notice how we use the output of the
download_data
task as the input of thetrain_model
task. -
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to the Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
-
Create a cell to upload the model to s3 using the
boto3
package, organize this cell to appear before the cell that defines the pipeline (@kfp.dsl.pipeline
):Solution
@dsl.component( base_image=PYTHON_IMAGE, packages_to_install=["boto3"], ) def upload_to_s3(model_onnx: dsl.Input[dsl.Model]): import os import boto3 from botocore.client import Config print("configuring s3 instance") # Configuration minio_url = "http://minio.object-datastore.svc.cluster.local:9000" access_key = "minio" secret_key = "minio123" # Setting up the MinIO client s3 = boto3.client( 's3', endpoint_url=minio_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key, config=Config(signature_version='s3v4'), ) for (dirpath, dirnames, filenames) in os.walk(model_onnx.path): for file in filenames: print(f"uploading file {dirpath}/{file}") s3.upload_file(f"{dirpath}/{file}", "models", f"accident_model/{file}")
-
Add the
upload_to_s3
task to the pipeline.Solution
@kfp.dsl.pipeline( name="Accident Detection", ) def accident_detection_pipeline(model_obc: str = "accident-detection"): download_data_task = download_data(dataset_type="sample") train_model_task = train_model(datasets=download_data_task.output) upload_to_s3(model_onnx=train_model_task.outputs["model_onnx"])
-
Save and run your notebook.
-
Validate that the pipeline is running, using the RHOAI dashboard, navigate to Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
-
Validate that the model is uploaded to the s3 bucket, by navigating to the s3 bucket in the OpenShift console.
-
Validate the resulting script: The script should look like this: train-car-rekon.py
Create a pipeline to train a model
-
Create a
model-training-pipeline
directory in theparasol-insurance
tenand directory. -
Create the
base
andoverlays
directories in themodel-training-pipeline
directory. -
In the
base
directory, create akustomization.yaml
file with the following content:Solution
apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: parasol-insurance resources: - model-retrain-imagestream.yaml - model-retrain-pipeline.yaml - model-retrain-pipelinerun.yaml - model-retrain-rbac.yaml - execute-kfp-task.yaml
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yamlapiVersion: image.openshift.io/v1 kind: ImageStream metadata: name: model-retrain
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yamlapiVersion: tekton.dev/v1beta1 kind: Pipeline metadata: name: model-retrain spec: params: - default: 'https://github.com/redhat-ai-services/ai-accelerator-bootcamp.git' description: Repo URL name: GIT_URL type: string - default: 'source_code/40_pipelines' description: Repo URL name: GIT_CONTEXT type: string - default: 'train-car-rekon.py' name: PIPELINE_SCRIPT type: string - default: main name: GIT_REVISION type: string - default: 3.11-ubi9 name: PYTHON_IMAGE type: string - default: 'image-registry.openshift-image-registry.svc:5000/parasol-insurance/model-retrain' name: TARGET_IMAGE type: string - default: 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443' name: KUBEFLOW_ENDPOINT type: string tasks: - name: git-clone params: - name: url value: $(params.GIT_URL) - name: revision value: $(params.GIT_REVISION) - name: gitInitImage value: 'registry.redhat.io/openshift-pipelines/pipelines-git-init-rhel8@sha256:868966ef9d4b54952d8a74eb83bba40eb1f52c0148994fa704efd0e3797c61c5' taskRef: kind: ClusterTask name: git-clone workspaces: - name: output workspace: source - name: s2i-python params: - name: VERSION value: $(params.PYTHON_IMAGE) - name: PATH_CONTEXT value: $(params.GIT_CONTEXT) - name: IMAGE value: $(params.TARGET_IMAGE) runAfter: - git-clone taskRef: kind: ClusterTask name: s2i-python workspaces: - name: source workspace: source - name: execute-kubeflow-pipeline params: - name: IMAGE value: $(params.TARGET_IMAGE) - name: TAG value: latest - name: SCRIPT value: $(params.PIPELINE_SCRIPT) - name: KUBEFLOW_ENDPOINT value: $(params.KUBEFLOW_ENDPOINT) runAfter: - s2i-python taskRef: kind: Task name: execute-kubeflow-pipeline workspaces: - name: source
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yamlapiVersion: tekton.dev/v1 kind: PipelineRun metadata: name: model-retrain-init annotations: argocd.argoproj.io/sync-wave: "10" spec: pipelineRef: name: model-retrain taskRunTemplate: serviceAccountName: pipeline timeouts: pipeline: 1h0m0s workspaces: - name: source volumeClaimTemplate: metadata: creationTimestamp: null spec: accessModes: - ReadWriteOnce resources: requests: storage: 1Gi volumeMode: Filesystem
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-rbac.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/model-retrain-rbac.yamlkind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: pipelines-dsp-access subjects: - kind: ServiceAccount name: pipelines roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: dsp-access --- kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: dsp-access rules: - verbs: - get apiGroups: - '' - route.openshift.io resources: - routes
-
Create file
tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yaml
.Solution
tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yamlapiVersion: tekton.dev/v1beta1 kind: Task metadata: name: execute-kubeflow-pipeline spec: description: >- This task will execute a python script, triggering a kubeflow pipeline params: - name: IMAGE description: The image used to execute the script type: string - name: TAG description: The tag for the image type: string default: "latest" - name: SCRIPT description: The location of the script to be executed - name: KUBEFLOW_ENDPOINT description: The endpoint URL for Kubeflow default: "https://ds-pipeline-dspa:8443" steps: - name: execute-python image: $(inputs.params.IMAGE):$(inputs.params.TAG) env: - name: KUBEFLOW_ENDPOINT value: $(inputs.params.KUBEFLOW_ENDPOINT) script: | python $(inputs.params.SCRIPT)
-
In the
overlays
directory, create aparasol-insurance-dev
directory. -
In the
parasol-insurance-dev
directory, create akustomization.yaml
file.Solution
tenants/parasol-insurance/model-training-pipeline/overlays/parasol-insurance-dev/kustomization.yamlapiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - ../../base
-
Commit and push the changes to the Git repository.
-
Wait for ArgoCD to sync the changes.
-
Navigate to the OpenShift console, and validate that the
model-retrain
pipeline is available in theparasol-insurance
namespace. -
Click on the
model-retrain
pipeline, and validate that there is a pipeline run, wait the pipeline run to complete -
Navigate to the RHOAI dashboard → Experiments → Experiments and runs: You should find a run with a name starting with accident-detection
Validate your code against Branch for model_retrain pipeline config
Experiments
A pipeline experiment is a workspace where you can try different configurations of your pipelines. You can use experiments to organize your runs into logical groups.
Experiments and Runs
-
Navigate to RHOAI Dasboard, click on Experiments > Experiments and Runs. Validate that new experiment
accident_detection
is created
-
Click on the experiment
accident_detection
to view pipeline runs.
-
Click on each pipeline run to view more details.
-
We can schedule periodic pipeline runs for an experiment. Click on 'Schedules'. Click on 'Create Schedule'. Please fill following details:
-
Experiment: We can choose an existing Experiment or create a new Experiment.
-
Name: Name for the schedule
-
Trigger Type: Periodic
-
Run Every: 1 hour
-
Start Date: Start date for the schedule
-
End Date: End Date for the schedule
-
Pipeline: Name of the pipeline
-
Pipeline version: Version of the pipeline
-