Introduction to Elyra Pipelines

In this section, we learn about elyra pipelines. Elyra pipelines are a way to orchestrate and manage machine learning workflows. They provide a structured approach to defining and executing tasks in a sequential manner. Elyra pipelines can be created using a visual editor, making it easy for users to design and customize their workflows.

Elyra Runtime Configuration in Jupyter Notebooks

In OpenShift AI, you can manage runtime configurations using the JupyterLab UI.

A runtime configuration provides Elyra access to the Data Science Pipelines backend for scalable pipeline execution.

The runtime configuration is included and is pre-configured for submitting pipelines to Data Science Pipelines only when the pipeline server is created in the project before the workbench is created.

Creating a Data Science Pipeline with Elyra

  1. Create a new directory named data in the root folder and upload the following files (test.csv, train.csv,validate.csv) You can download the file by clicking the link below:

Create notebook to train a model

  1. Open the standard-workbench in parasol-insurance project. Click on Launcher and create a new python notebook 1.train_model.ipynb

    notebook
  2. Add the following content to 1.train_model.ipynb notebook.

    !pip install onnx==1.17.0 onnxruntime==1.20.1 tf2onnx==1.16.1 keras tensorflow
  3. Add a new cell and add the following content:

    import numpy as np
    import pandas as pd
    import datetime
    from keras.models import Sequential
    from keras.layers import Dense, Dropout, BatchNormalization, Activation
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.utils import class_weight
    import tf2onnx
    import onnx
    import pickle
    from pathlib import Path
  4. Add a new cell and add the following content:

    # Set the input (X) and output (Y) data.
    # The only output data is whether it's fraudulent. All other fields are inputs to the model.
    
    feature_indexes = [
        1,  # distance_from_last_transaction
        2,  # ratio_to_median_purchase_price
        4,  # used_chip
        5,  # used_pin_number
        6,  # online_order
    ]
    
    label_indexes = [
        7  # fraud
    ]
    
    df = pd.read_csv('data/train.csv')
    X_train = df.iloc[:, feature_indexes].values
    y_train = df.iloc[:, label_indexes].values
    
    df = pd.read_csv('data/validate.csv')
    X_val = df.iloc[:, feature_indexes].values
    y_val = df.iloc[:, label_indexes].values
    
    df = pd.read_csv('data/test.csv')
    X_test = df.iloc[:, feature_indexes].values
    y_test = df.iloc[:, label_indexes].values
    
    
    # Scale the data to remove mean and have unit variance. The data will be between -1 and 1, which makes it a lot easier for the model to learn than random (and potentially large) values.
    # It is important to only fit the scaler to the training data, otherwise you are leaking information about the global distribution of variables (which is influenced by the test set) into the training set.
    
    scaler = StandardScaler()
    
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    X_test = scaler.transform(X_test)
    
    Path("artifact").mkdir(parents=True, exist_ok=True)
    with open("artifact/test_data.pkl", "wb") as handle:
        pickle.dump((X_test, y_test), handle)
    with open("artifact/scaler.pkl", "wb") as handle:
        pickle.dump(scaler, handle)
    
    # Since the dataset is unbalanced (it has many more non-fraud transactions than fraudulent ones), set a class weight to weight the few fraudulent transactions higher than the many non-fraud transactions.
    class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train.ravel())
    class_weights = {i : class_weights[i] for i in range(len(class_weights))}
  5. Add a new cell and add the following content:

    model = Sequential()
    model.add(Dense(32, activation='relu', input_dim=len(feature_indexes)))
    model.add(Dropout(0.2))
    model.add(Dense(32))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(32))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    model.summary()
  6. Add a new cell and add the following content:

    # Train the model and get performance
    import os
    import time
    
    start = time.time()
    epochs = 2
    history = model.fit(
        X_train,
        y_train,
        epochs=epochs,
        validation_data=(X_val, y_val),
        verbose=True,
        class_weight=class_weights
    )
    end = time.time()
    print(f"Training of model is complete. Took {end-start} seconds")
  7. Add a new cell and add the following content:

    #Save the model
    import tensorflow as tf
    
    # Normally we use tf2.onnx.convert.from_keras.
    # workaround for tf2onnx bug https://github.com/onnx/tensorflow-onnx/issues/2348
    
    # Wrap the model in a `tf.function`
    @tf.function(input_signature=[tf.TensorSpec([None, X_train.shape[1]], tf.float32, name='dense_input')])
    def model_fn(x):
        return model(x)
    
    # Convert the Keras model to ONNX
    model_proto, _ = tf2onnx.convert.from_function(
        model_fn,
        input_signature=[tf.TensorSpec([None, X_train.shape[1]], tf.float32, name='dense_input')]
    )
    
    # Save the model as ONNX for easy use of ModelMesh
    os.makedirs("models/fraud/1", exist_ok=True)
    onnx.save(model_proto, "models/fraud/1/model.onnx")
  8. Add a new cell and add the following content:

    ! ls -alRh ./models/
  9. Add a new cell and add the following content:

    #Test the model
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    import numpy as np
    import pickle
    import onnxruntime as rt
  10. Add a new cell and add the following content:

    with open('artifact/scaler.pkl', 'rb') as handle:
        scaler = pickle.load(handle)
    with open('artifact/test_data.pkl', 'rb') as handle:
        (X_test, y_test) = pickle.load(handle)
  11. Add a new cell and add the following content:

    sess = rt.InferenceSession("models/fraud/1/model.onnx", providers=rt.get_available_providers())
    input_name = sess.get_inputs()[0].name
    output_name = sess.get_outputs()[0].name
    y_pred_temp = sess.run([output_name], {input_name: X_test.astype(np.float32)})
    y_pred_temp = np.asarray(np.squeeze(y_pred_temp[0]))
    threshold = 0.95
    y_pred = np.where(y_pred_temp > threshold, 1, 0)
  12. Add a new cell and add the following content:

    from sklearn.metrics import precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay
    import numpy as np
    
    y_test_arr = y_test.squeeze()
    correct = np.equal(y_pred, y_test_arr).sum().item()
    acc = (correct / len(y_pred)) * 100
    precision = precision_score(y_test_arr, np.round(y_pred))
    recall = recall_score(y_test_arr, np.round(y_pred))
    
    print(f"Eval Metrics: \n Accuracy: {acc:>0.1f}%, "
          f"Precision: {precision:.4f}, Recall: {recall:.4f} \n")
    
    c_matrix = confusion_matrix(y_test_arr, y_pred)
    ConfusionMatrixDisplay(c_matrix).plot()
  13. Add a new cell and add the following content:

    sally_transaction_details = [
        [0.3111400080477545,
        1.9459399775518593,
        1.0,
        0.0,
        0.0]
        ]
    
    prediction = sess.run([output_name], {input_name: scaler.transform(sally_transaction_details).astype(np.float32)})
    
    print("Is Sally's transaction predicted to be fraudulent? (true = YES, false = NO) ")
    print(np.squeeze(prediction) > threshold)
    
    print("How likely was Sally's transaction to be fraudulent? ")
    print("{:.5f}".format(100 * np.squeeze(prediction)) + "%")

Create a new notebook to save the model

  1. Create a new jupyter notebook 2.save_model.ipynb and add the following content:

    !pip install boto3==1.35.55 botocore==1.35.55
  2. Add a new cell and add the following content:

    import os
    import boto3
    import botocore
    
    aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
    endpoint_url = os.environ.get('AWS_S3_ENDPOINT')
    region_name = os.environ.get('AWS_DEFAULT_REGION')
    bucket_name = os.environ.get('AWS_S3_BUCKET')
    
    print(f"{aws_access_key_id}")
    
    
    if not all([aws_access_key_id, aws_secret_access_key, endpoint_url, region_name, bucket_name]):
        raise ValueError("One or more connection variables are empty.  "
                         "Please check your connection to an S3 bucket.")
    
    session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
                                    aws_secret_access_key=aws_secret_access_key)
    
    s3_resource = session.resource(
        's3',
        config=botocore.client.Config(signature_version='s3v4'),
        endpoint_url=endpoint_url,
        region_name=region_name)
    
    bucket = s3_resource.Bucket(bucket_name)
    
    
    def upload_directory_to_s3(local_directory, s3_prefix):
        num_files = 0
        for root, dirs, files in os.walk(local_directory):
            for filename in files:
                file_path = os.path.join(root, filename)
                relative_path = os.path.relpath(file_path, local_directory)
                s3_key = os.path.join(s3_prefix, relative_path)
                print(f"{file_path} -> {s3_key}")
                bucket.upload_file(file_path, s3_key)
                num_files += 1
        return num_files
    
    
    def list_objects(prefix):
        filter = bucket.objects.filter(Prefix=prefix)
        for obj in filter.all():
            print(obj.key)
    Updated custom notebook.yaml to avoid storage related errors.
    apiVersion: kubeflow.org/v1
    kind: Notebook
    metadata:
      annotations:
        notebooks.opendatahub.io/inject-oauth: 'true'
        opendatahub.io/image-display-name: Datascience notebook
        notebooks.opendatahub.io/oauth-logout-url: ''
        opendatahub.io/accelerator-name: ''
        openshift.io/description: ''
        openshift.io/display-name: custom-workbench
        notebooks.opendatahub.io/last-image-selection: 'custom-notebook:latest'
        argocd.argoproj.io/sync-options: ServerSideApply=true
      name: custom-workbench
      namespace: parasol-insurance
    spec:
      template:
        spec:
          affinity: {}
          containers:
            - name: custom-workbench
              image: 'image-registry.openshift-image-registry.svc:5000/redhat-ods-applications/custom-notebook:latest'
              resources:
                limits:
                  cpu: '2'
                  memory: 8Gi
                requests:
                  cpu: '1'
                  memory: 8Gi
              readinessProbe:
                failureThreshold: 3
                httpGet:
                  path: /notebook/parasol-insurance/custom-workbench/api
                  port: notebook-port
                  scheme: HTTP
                initialDelaySeconds: 10
                periodSeconds: 5
                successThreshold: 1
                timeoutSeconds: 1
              livenessProbe:
                failureThreshold: 3
                httpGet:
                  path: /notebook/parasol-insurance/custom-workbench/api
                  port: notebook-port
                  scheme: HTTP
                initialDelaySeconds: 10
                periodSeconds: 5
                successThreshold: 1
                timeoutSeconds: 1
              env:
                - name: NOTEBOOK_ARGS
                  value: |-
                    --ServerApp.port=8888
                    --ServerApp.token=''
                    --ServerApp.password=''
                    --ServerApp.base_url=/notebook/parasol-insurance/custom-workbench
                    --ServerApp.quit_button=False
                    --ServerApp.tornado_settings={"user":"user1","hub_host":"","hub_prefix":"/projects/parasol-insurance"}
                - name: JUPYTER_IMAGE
                  value: 'image-registry.openshift-image-registry.svc:5000/redhat-ods-applications/custom-notebook:latest'
                - name: PIPELINES_SSL_SA_CERTS
                  value: /etc/pki/tls/custom-certs/ca-bundle.crt
                - name: GIT_SSL_CAINFO
                  value: /etc/pki/tls/custom-certs/ca-bundle.crt
                - name: PIP_CERT
                  value: /etc/pki/tls/custom-certs/ca-bundle.crt
                - name: REQUESTS_CA_BUNDLE
                  value: /etc/pki/tls/custom-certs/ca-bundle.crt
                - name: SSL_CERT_FILE
                  value: /etc/pki/tls/custom-certs/ca-bundle.crt
              ports:
                - containerPort: 8888
                  name: notebook-port
                  protocol: TCP
              imagePullPolicy: Always
              volumeMounts:
                - mountPath: /opt/app-root/src
                  name: custom-workbench
                - mountPath: /dev/shm
                  name: shm
                - mountPath: /etc/pki/tls/custom-certs/ca-bundle.crt
                  name: trusted-ca
                  readOnly: true
                  subPath: ca-bundle.crt
              workingDir: /opt/app-root/src
              envFrom:
                - secretRef:
                    name: minio-data-connection
          enableServiceLinks: false
          serviceAccountName: custom-workbench
          volumes:
            - name: custom-workbench
              persistentVolumeClaim:
                claimName: custom-workbench
            - emptyDir:
                medium: Memory
              name: shm
            - configMap:
                items:
                  - key: ca-bundle.crt
                    path: ca-bundle.crt
                name: workbench-trusted-ca-bundle
                optional: true
              name: trusted-ca
    Updated minio-data-connection.yaml to avoid storage related errors.
    kind: Secret
    apiVersion: v1
    metadata:
      name: minio-data-connection
      labels:
        opendatahub.io/dashboard: 'true'
        opendatahub.io/managed: 'true'
      annotations:
        opendatahub.io/connection-type: s3
        openshift.io/display-name: minio-data-connection
        argocd.argoproj.io/sync-wave: "-100"
    stringData:
      AWS_ACCESS_KEY_ID: minio
      AWS_S3_ENDPOINT: http://minio.object-datastore.svc.cluster.local:9000
      AWS_SECRET_ACCESS_KEY: minio123
      AWS_DEFAULT_REGION: east-1
      AWS_S3_BUCKET: pipelines
    type: Opaque
  3. Add a new cell and add the following content:

    list_objects("models")
  4. Add a new cell and add the following content:

    local_models_directory = "models"
    
    if not os.path.isdir(local_models_directory):
        raise ValueError(f"The directory '{local_models_directory}' does not exist.  "
                         "Did you finish training the model in the previous notebook?")
    
    num_files = upload_directory_to_s3("models", "models")
    
    if num_files == 0:
        raise ValueError("No files uploaded.  Did you finish training and "
                         "saving the model to the \"models\" directory?  "
                         "Check for \"models/fraud/1/model.onnx\"")
  5. Add a new cell and add the following content:

    list_objects("models")

Create a pipeline using Elyra pipeline editor

  1. Click on Pipeline Editor and create a new pipeline. Name the pipeline as train_save.pipeline`

    pipeline editor
  2. Drag and drop two notebooks just created (1.train_model and 2.save_model) into the pipeline editor.

  3. Connect the output port of node 1.train_model to the input port of node 2.save_model.

  4. Update the pipeline parameters for node 1.train_model.

    pipeline parameters
  5. Update the runtime image for both nodes to Datascience with Python 3.11 (UBI9) created in the previous section.

    elyra runtime
  6. Update S3 storage Kubernetes Secrets for node 2.save_model

    Solution
    AWS_ACCESS_KEY_ID: <AWS_ACCESS_KEY_ID>
    AWS_SECRET_ACCESS_KEY: <AWS_SECRET_ACCESS_KEY>
    AWS_S3_BUCKET: <AWS_S3_BUCKET>
    AWS_S3_ENDPOINT: <AWS_S3_ENDPOINT>
    AWS_DEFAULT_REGION: <AWS_DEFAULT_REGION>
  7. Click on Run icon to run the pipeline. Pipline run can be viewed on RHOAI dashboard.

    elyra run pipeline

Pipeline execution

Elyra is now converting your pipeline definition into a YAML representation and sending it to the Data Science Pipelines backend. After a few seconds, you should see confirmation that the pipeline has been successfully submitted.

To monitor the pipeline’s execution, click on the Run Details link, which takes you to the pipeline run view within the RHOAI dashboard. Here you can track in real-time how each pipeline task is processed and whether it fails or resolves successfully.

To confirm that the pipeline has indeed produced fraud detection scoring results, view the content of the pipeline storage bucket. In the folder, there will be HTML files that show the status of each of the task executions.

Navigate back to the Experiment and Runs overview in the RHOAI dashboard. Click the experiment to see the history of all ongoing and previous pipeline executions of the same name and compare their run durations and status.

In the Scheduled tab you’re able to view runs of the pipeline according to a predefined schedule such as daily or according to a Cron statement.

Questions for Further Consideration

  1. Will all the upstream Elyra features be avaiable on RHOAI?

  2. How can you implement error handling strategies within your pipelines to manage failures effectively?

  3. What techniques can be employed to optimize the performance of pipelines, especially when dealing with large datasets?