Working with Pipelines

In this module we will be working with pipelines. We will be creating a pipeline that will train a model and then deploy it to a serving runtime.

Develop a KFP pipeline to retrain a model

In the parasol-insurance repository, there is a python notebook that re-trains a model: 04-03-model-retraining.ipynb. We will be using this notebook to create a pipeline that will retrain the model.

  1. In RHOAI, open the standard-workbench.

  2. Create a new notebook and name it model-retraining-pipeline.

  3. Start the notebook with a cell to import the packages neccesary to work with kfp pipelines sdk:

    Solution
    import kfp.compiler
    from kfp import dsl
  4. Add a cell to define the kubeflow endpoint, and the image that will be used to create containers that will execute each task:

    Solution
    KUBEFLOW_ENDPOINT = 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443'
    PYTHON_IMAGE = 'image-registry.openshift-image-registry.svc:5000/openshift/python:latest'
  5. Add a cell to define the task that will download the dataset (see the retrieve_dataset function defined at 04-03-model-retraining.ipynb)

    Solution
    @dsl.component(
        base_image=PYTHON_IMAGE,
        packages_to_install=["requests", "zipp"],
    )
    def download_data(dataset_type: str,
                      datasets: dsl.Output[dsl.Dataset]):
        import requests
        import zipfile
    
        URL = f"https://rhods-public.s3.amazonaws.com/sample-data/accident-data/accident-{dataset_type}.zip"
    
        print("Downloading file...")
        response = requests.get(URL, stream=True)
        block_size = 1024
        with open(f'./accident-{dataset_type}.zip', 'wb') as f:
            for data in response.iter_content(block_size):
                f.write(data)
    
        print("Unzipping file...")
        with zipfile.ZipFile(f'./accident-{dataset_type}.zip', 'r') as zip_ref:
            zip_ref.extractall(path=datasets.path)
        print("Done!")

    Notice the output dataset parameter (datasets: dsl.Output[dsl.Dataset]), and how we use the path of such dataset to unzip the contents of the downloaded dataset.

  6. Create a cell to define the pipeline, and use the previously defined task.

    Solution
    @kfp.dsl.pipeline(
        name="Accident Detection",
    )
    def accident_detection_pipeline(model_obc: str = "accident-detection"):
        download_data(dataset_type="sample")
  7. Create a cell to connect to the KUBEFLOW_ENDPOINT, and create your pipeline run:

    Solution
    print(f"Connecting to kfp: {KUBEFLOW_ENDPOINT}")
    import os
    
    bearer_token = "sha256~P0wEh46fxWa4uzPKR-b3fhcnsyXvCju4GovRd2YNNKM"
    
    sa_ca_cert = "/run/secrets/kubernetes.io/serviceaccount/service-ca.crt"
    if os.path.isfile(sa_ca_cert) and "svc" in KUBEFLOW_ENDPOINT:
        ssl_ca_cert = sa_ca_cert
    else:
        ssl_ca_cert = None
    
    client = kfp.Client(
        host=KUBEFLOW_ENDPOINT,
        existing_token=bearer_token,
        ssl_ca_cert=ssl_ca_cert,
    )
    result = client.create_run_from_pipeline_func(
        accident_detection_pipeline, arguments={}, experiment_name="accident-detection")

    Provide your own token, you can find the token in the OpenShift Web Console by clicking on your username in the top right corner and selecting Copy Login Command.

  8. Save and run your notebook.

  9. Validate that the pipeline is running, using the RHOAI dashboard, navigate to the pipeline runs of the parasol-insurance data science project: You should find a run with a name starting with accident-detection

  10. Create a cell to train the model, organize this cell to appear before the cell that defines the pipeline (@kfp.dsl.pipeline). The contents of this cell were crafted after from a combination of functions from 04-03-model-retraining.ipynb; we recommend you to compare this cell with the original notebook contents:

    Solution
    @dsl.component(
        base_image=PYTHON_IMAGE,
        packages_to_install=["ultralytics", "opencv-contrib-python-headless"],
    )
    def train_model(datasets: dsl.Input[dsl.Dataset],
                    model_onnx: dsl.Output[dsl.Model]):
        import os
        import shutil
        import datetime
        from ultralytics import YOLO
    
        print("setting the symlink for the datasets")
        os.symlink(datasets.path, "/opt/app-root/src/datasets")
    
        # Load model
        print("using a base model to start the training")
        model = YOLO('yolov8m.pt')  # load a pretrained model (recommended for training)
        print("training the model")
        model.train(data=f'{datasets.path}/accident-sample/data.yaml',
                              epochs=1, imgsz=640, batch=2)
    
        print("saving the file as onnx")
    
        # create runs/detect/train/weights/best.onnx
        YOLO("/opt/app-root/src/runs/detect/train/weights/best.pt").export(format="onnx")
    
        # save runs/detect/train/weights/best.onnx as {model_onnx.path}/accident-detection_{timestamp}.onnx
        timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M")
        os.makedirs(model_onnx.path, exist_ok=True)
        shutil.copy('/opt/app-root/src/runs/detect/train/weights/best.onnx',
                    f'{model_onnx.path}/accident-detection_{timestamp}.onnx')

Notice here the use of the shutil package to copy from the default output directory from the YOLO package to the output path of our model_onnx output parameter.

  1. Add the train_model task to the pipeline:

    Solution
    @kfp.dsl.pipeline(
        name="Accident Detection",
    )
    def accident_detection_pipeline(model_obc: str = "accident-detection"):
        download_data_task = download_data(dataset_type="sample")
        train_model(datasets=download_data_task.output)

    Notice how we use the output of the download_data task as the input of the train_model task.

  2. Save and run your notebook.

  3. Validate that the pipeline is running, using the RHOAI dashboard, navigate to the ExperimentsExperiments and runs: You should find a run with a name starting with accident-detection

  4. Create a cell to upload the model to s3 using the boto3 package, organize this cell to appear before the cell that defines the pipeline (@kfp.dsl.pipeline):

    Solution
    @dsl.component(
        base_image=PYTHON_IMAGE,
        packages_to_install=["boto3"],
    )
    def upload_to_s3(model_onnx: dsl.Input[dsl.Model]):
        import os
        import boto3
        from botocore.client import Config
    
        print("configuring s3 instance")
        # Configuration
        minio_url = "http://minio.object-datastore.svc.cluster.local:9000"
        access_key = "minio"
        secret_key = "minio123"
    
        # Setting up the MinIO client
        s3 = boto3.client(
            's3',
            endpoint_url=minio_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            config=Config(signature_version='s3v4'),
        )
    
        for (dirpath, dirnames, filenames) in os.walk(model_onnx.path):
            for file in filenames:
                print(f"uploading file {dirpath}/{file}")
                s3.upload_file(f"{dirpath}/{file}", "models",
                               f"accident_model/{file}")
  5. Add the upload_to_s3 task to the pipeline.

    Solution
    @kfp.dsl.pipeline(
        name="Accident Detection",
    )
    def accident_detection_pipeline(model_obc: str = "accident-detection"):
        download_data_task = download_data(dataset_type="sample")
        train_model_task = train_model(datasets=download_data_task.output)
        upload_to_s3(model_onnx=train_model_task.outputs["model_onnx"])
  6. Save and run your notebook.

  7. Validate that the pipeline is running, using the RHOAI dashboard, navigate to ExperimentsExperiments and runs: You should find a run with a name starting with accident-detection

  8. Validate that the model is uploaded to the s3 bucket, by navigating to the s3 bucket in the OpenShift console.

  9. Validate the resulting script: The script should look like this: train-car-rekon.py

Create a pipeline to train a model

  1. Create a model-training-pipeline directory in the parasol-insurance tenand directory.

  2. Create the base and overlays directories in the model-training-pipeline directory.

  3. In the base directory, create a kustomization.yaml file with the following content:

    Solution
    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    
    namespace: parasol-insurance
    
    resources:
      - model-retrain-imagestream.yaml
      - model-retrain-pipeline.yaml
      - model-retrain-pipelinerun.yaml
      - model-retrain-rbac.yaml
      - execute-kfp-task.yaml
  4. Create file tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yaml.

    Solution
    tenants/parasol-insurance/model-training-pipeline/base/model-retrain-imagestream.yaml
    apiVersion: image.openshift.io/v1
    kind: ImageStream
    metadata:
      name: model-retrain
  5. Create file tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yaml.

    Solution
    tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipeline.yaml
    apiVersion: tekton.dev/v1beta1
    kind: Pipeline
    metadata:
      name: model-retrain
    spec:
      params:
        - default: 'https://github.com/redhat-ai-services/ai-accelerator-bootcamp.git'
          description: Repo URL
          name: GIT_URL
          type: string
        - default: 'source_code/40_pipelines'
          description: Repo URL
          name: GIT_CONTEXT
          type: string
        - default: 'train-car-rekon.py'
          name: PIPELINE_SCRIPT
          type: string
        - default: main
          name: GIT_REVISION
          type: string
        - default: 3.11-ubi9
          name: PYTHON_IMAGE
          type: string
        - default: 'image-registry.openshift-image-registry.svc:5000/parasol-insurance/model-retrain'
          name: TARGET_IMAGE
          type: string
        - default: 'https://ds-pipeline-dspa.parasol-insurance.svc.cluster.local:8443'
          name: KUBEFLOW_ENDPOINT
          type: string
      tasks:
        - name: git-clone
          params:
            - name: url
              value: $(params.GIT_URL)
            - name: revision
              value: $(params.GIT_REVISION)
            - name: gitInitImage
              value: 'registry.redhat.io/openshift-pipelines/pipelines-git-init-rhel8@sha256:868966ef9d4b54952d8a74eb83bba40eb1f52c0148994fa704efd0e3797c61c5'
          taskRef:
            kind: ClusterTask
            name: git-clone
          workspaces:
            - name: output
              workspace: source
        - name: s2i-python
          params:
            - name: VERSION
              value: $(params.PYTHON_IMAGE)
            - name: PATH_CONTEXT
              value: $(params.GIT_CONTEXT)
            - name: IMAGE
              value: $(params.TARGET_IMAGE)
          runAfter:
            - git-clone
          taskRef:
            kind: ClusterTask
            name: s2i-python
          workspaces:
            - name: source
              workspace: source
        - name: execute-kubeflow-pipeline
          params:
            - name: IMAGE
              value: $(params.TARGET_IMAGE)
            - name: TAG
              value: latest
            - name: SCRIPT
              value: $(params.PIPELINE_SCRIPT)
            - name: KUBEFLOW_ENDPOINT
              value: $(params.KUBEFLOW_ENDPOINT)
          runAfter:
            - s2i-python
          taskRef:
            kind: Task
            name: execute-kubeflow-pipeline
      workspaces:
        - name: source
  6. Create file tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yaml.

    Solution
    tenants/parasol-insurance/model-training-pipeline/base/model-retrain-pipelinerun.yaml
    apiVersion: tekton.dev/v1
    kind: PipelineRun
    metadata:
      name: model-retrain-init
      annotations:
        argocd.argoproj.io/sync-wave: "10"
    spec:
      pipelineRef:
        name: model-retrain
      taskRunTemplate:
        serviceAccountName: pipeline
      timeouts:
        pipeline: 1h0m0s
      workspaces:
      - name: source
        volumeClaimTemplate:
          metadata:
            creationTimestamp: null
          spec:
            accessModes:
            - ReadWriteOnce
            resources:
              requests:
                storage: 1Gi
            volumeMode: Filesystem
  7. Create file tenants/parasol-insurance/model-training-pipeline/base/model-retrain-rbac.yaml.

    Solution
    tenants/parasol-insurance/model-training-pipeline/base/model-retrain-rbac.yaml
    kind: RoleBinding
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
      name: pipelines-dsp-access
    subjects:
      - kind: ServiceAccount
        name: pipelines
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: dsp-access
    ---
    kind: Role
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
      name: dsp-access
    rules:
      - verbs:
          - get
        apiGroups:
          - ''
          - route.openshift.io
        resources:
          - routes
  8. Create file tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yaml.

    Solution
    tenants/parasol-insurance/model-training-pipeline/base/execute-kfp-task.yaml
    apiVersion: tekton.dev/v1beta1
    kind: Task
    metadata:
      name: execute-kubeflow-pipeline
    spec:
      description: >-
        This task will execute a python script, triggering a kubeflow pipeline
      params:
        - name: IMAGE
          description: The image used to execute the script
          type: string
        - name: TAG
          description: The tag for the image
          type: string
          default: "latest"
        - name: SCRIPT
          description: The location of the script to be executed
        - name: KUBEFLOW_ENDPOINT
          description: The endpoint URL for Kubeflow
          default: "https://ds-pipeline-dspa:8443"
      steps:
        - name: execute-python
          image: $(inputs.params.IMAGE):$(inputs.params.TAG)
          env:
            - name: KUBEFLOW_ENDPOINT
              value: $(inputs.params.KUBEFLOW_ENDPOINT)
          script: |
            python $(inputs.params.SCRIPT)
  9. In the overlays directory, create a parasol-insurance-dev directory.

  10. In the parasol-insurance-dev directory, create a kustomization.yaml file.

    Solution
    tenants/parasol-insurance/model-training-pipeline/overlays/parasol-insurance-dev/kustomization.yaml
    apiVersion: kustomize.config.k8s.io/v1beta1
    kind: Kustomization
    
    resources:
      - ../../base
  11. Commit and push the changes to the Git repository.

  12. Wait for ArgoCD to sync the changes.

  13. Navigate to the OpenShift console, and validate that the model-retrain pipeline is available in the parasol-insurance namespace.

  14. Click on the model-retrain pipeline, and validate that there is a pipeline run, wait the pipeline run to complete

  15. Navigate to the RHOAI dashboard → ExperimentsExperiments and runs: You should find a run with a name starting with accident-detection

    Validate your code against Branch for model_retrain pipeline config

Experiments

A pipeline experiment is a workspace where you can try different configurations of your pipelines. You can use experiments to organize your runs into logical groups.

Experiments and Runs

  1. Navigate to RHOAI Dasboard, click on Experiments > Experiments and Runs. Validate that new experiment accident_detection is created

experiments runs
  1. Click on the experiment accident_detection to view pipeline runs.

pipeline runs
  1. Click on each pipeline run to view more details.

pipeline run details
  1. We can schedule periodic pipeline runs for an experiment. Click on 'Schedules'. Click on 'Create Schedule'. Please fill following details:

    • Experiment: We can choose an existing Experiment or create a new Experiment.

    • Name: Name for the schedule

    • Trigger Type: Periodic

    • Run Every: 1 hour

    • Start Date: Start date for the schedule

    • End Date: End Date for the schedule

    • Pipeline: Name of the pipeline

    • Pipeline version: Version of the pipeline

schedule 1
schedule 2

Executions

On Executions page, you can view the execution details of each pipeline task execution, such as its name, status, unique ID, and execution type.

executions

Artifacts

On Artifacts page, you can view the pipeline artifacts. It helps you to evaluate the performance of your pipeline runs.

artifacts