# Kubeflow Pipelines

{% hint style="info" %}
紀錄時間為 2020/04/09，內容可能過時，請斟酌參考
{% endhint %}

一個完全不懂 AI 的工程師被迫研究 Kubeflow，**內容不保證正確，看看就好**XD\
研究完也沒有要使用，所以純粹紀錄

## 簡單介紹

說明如何撰寫 pipeline 的 components，並串起 components，組成一個 workflow

下圖為一個簡單的 pipeline 範例\
[![](http://10.50.0.12/wiki/lib/exe/fetch.php?w=200\&tok=2f100b\&media=project:openstack:container:pipeline.jpg)](http://10.50.0.12/wiki/lib/exe/detail.php?id=project%3Aopenstack%3Acontainer%3Akubeflow_pipelines\&media=project:openstack:container:pipeline.jpg)

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-dad49854fffbd4ac01fa209346a365e1dcc36e23%2Fpipeline.jpg?alt=media)

* 每一格方框叫做 component，每一個 component 都是 container
* component 可以輸出資料跟輸入資料
* 寫 component 的方法有很多種:
  1. function to component
  2. container to component
  3. yaml to component
* 以下範例都是在 kubeflow 的 notebook server 中使用

## Function to component

* 寫 python function ，將 function 變成 component
* 在 notebook 環境中 安裝 kfp

  ```
  !pip3 install kfp
  ```
* 載入 kfp 相關套件

  ```
  import kfp
  from kfp.components import InputPath, OutputPath
  from kfp.components import func_to_container_op
  ```

### 第一個 component

* 給定 一個(count)數字，產出 count 個數的 1到99隨機數字，寫入到file，並存到minio
* output\_text\_path: 將結果儲存至 minio，並輸出可以存取 minio 物件的路徑(字串)

  ```
  @func_to_container_op
  def random_number(count: int,  output_text_path: OutputPath(str)):
      import random
      with open(output_text_path, 'w') as writer:
          for i in range(count):
              writer.write(str(random.randrange(0,100)) + '\n')
  ```

### 第二個 component

* 取得 input 檔案的路徑，從minio下載後，開啟檔案，計算檔案內所有的數字總和，再寫入到file，並存到minio
* text\_path: InputPath(str) 這參數會自動從 minio 下載檔案，所以 text\_path 就是下載檔案後的路徑
* InputPath 可以指定類型，像是CSV檔案， InputPath('CSV') [說明](https://tinyurl.com/qlq9anb)

  ```
  @func_to_container_op
  def calcu_number(text_path: InputPath(str),  output_text_path: OutputPath(str)):
      sum = 0
      with open(text_path, 'r') as reader:
          for line in reader:
              sum = sum + int(line)
      with open(output_text_path, 'w') as writer:
           writer.write(str(sum))
  ```

### 第三個 component

* 取得 input 檔案的路徑，從minio下載後，開啟檔案，讀取資料後，print出資料

  ```
  @func_to_container_op    
  def print_sum(text_path: InputPath(str)):
       with open(text_path, 'r') as reader:
          for line in reader:
              print(line)
  ```

### 組合 component 成 pipeline

* 定義 pipeline，也就是串接 component

  ```
  def sum_random_number():
      # 第一個 component 為 random_number，傳入數字50參數
      random_number_task = random_number(50)
    
      # 第二個 component 接收第一個 component的輸出
      # 這裡要使用 random_number_task.outputs['output_text'] 取得輸出，output_text的來由是 random_number的 output_text_path，但是 kubeflow 會省略 _path 和 _file，所以變成 output_text
      calcu_number_task = calcu_number(random_number_task.outputs['output_text'])
    
      #  第三個 component 接收第二個 component的輸出，print出總和
      print_sum_task = print_sum(calcu_number_task.outputs['output_text'])
  ```
* 啟動 pipline，arguments 可以帶入參數

  ```
  kfp.Client().create_run_from_pipeline_func(sum_random_number, arguments={})
  ```
* 若沒有錯誤可以看到 輸出 Experiment link here 跟 Run link here，點here可以看到結果

## Component yaml

* 此方式利用 yaml 檔來定義 component
* 需要先寫完程式，並包成 container
* 以下範例有 2個 component
  1. 產生出N個 0\~100的數字
  2. 計算上面產生出的N個數字和

### 第一個 Component 程式

* 產生出N個 0\~100的數字
* 接受 count 的參數，產生出 count 個數的 0\~100數字
* 接受 output\_path，將產生出的數字寫到此路徑
* 請使用 argparse 接受參數
* 以下為程式碼

  ```
  import argparse
  import os
  import random
  # Parse agruments.
  parser = argparse.ArgumentParser()
  parser.add_argument('--count', type=int, required=True, help='Random count of 0~100.')
  parser.add_argument('--output_path', type=str, required=True)
  args = parser.parse_args()
  count = args.count
  output_path = args.output_path
  print(output_path)

  os.makedirs(os.path.dirname(output_path), exist_ok=True)
  with open(output_path, 'w') as writer:
      for i in range(count):
          writer.write(str(random.randrange(0,100)) + '\n')
  ```
* 程式寫完後，打包成 container

### 第一個 Component yaml

* 指定 input output
* 指定 container 的 image
* 指定 要接收的參數
* 輸出的檔案路徑

  ```
  name: random_number
  description: Random count of 0~100

  inputs:
    # 設定要接受的參數
    # 接收 Count，為整數
    - name: Count
      type: int

  outputs:
    # 設定 output 名稱
    - name: Show Number

  implementation:
    container:
      # 指定包好的 container
      image: iservcloud/random_number:latest
      command: [python, /app/app.py]
      # 這裡就會接收上面的 input 參數
      args: [
          --count, {inputValue: Count},
          --output_path, {outputPath: Show Number}
      ]
      # fileOutputs 會輸出檔案到 minio
      # 所以這裡的 Show Number 要跟 outputs 的 Show Number名稱一致
      # 冒號後面就是 container 內部的檔案路徑，就會在 pipline的 Output artifacts 看到你的輸出
      fileOutputs:
        Show Number: /blobs.txt
  ```

### 第二個 Component 程式

* 指定 input path (這裡要注意)
* 指定 output path
* 指定 container 的 image
* 指定 要接收的參數
* 輸出的檔案路徑

  ```
  import argparse
  # Parse agruments.
  parser = argparse.ArgumentParser()
  parser.add_argument('--input_path', type=str, required=True, help='Get number file')
  parser.add_argument('--output_path', type=str, required=True)
  args = parser.parse_args()
  input_path = args.input_path
  output_path = args.output_path
  print(input_path)
  sum = 0

  # 這裡得到 input_path 的參數，也就是讀取這路徑的檔案
  with open(input_path, 'r') as reader:
      line = reader.read()
      num_list = line.split('\n')

  print(num_list)
  for i in range((len(num_list)-1)):
      sum = sum + int(num_list[i])

  with open(output_path, 'w') as writer:
      writer.write(str(sum) + '\n')
  ```

### 第二個 Component yaml

* 指定 inputs outputs

  ```
  name: calcu_number
  description: sum of number

  inputs:
    - name: Input path
      type: String
    - name: Output path
      type: String

  outputs:
    - name: Show Number

  implementation:
    container:
      image: iservcloud/calcu_number:latest
      command: [python, /app/app.py]
      args: [
          --input_path, {inputPath: Input path},
          --output_path, {inputValue: Output path}
      ]
      fileOutputs:
        Show Number: /blobs.txt
  ```

yaml 檔的 args 這裡要注意，input有2種參數\
\* inputPath: 會傳一個路徑的字串\
\* inputValue: 會傳一個字串參數如果程式是需要讀檔的方式，yaml檔的 input 就要用 inputPath 的類型，這樣程式接收到的 input\_path 參數就會是 檔案的路徑， 因此可以使用讀檔編輯此段

### 組合 component 成 pipeline

* 載入 kfp

  ```
  import kfp
  from kfp.components import load_component_from_file
  ```
* 讀取 yaml 檔

  ```
  random_number_op = load_component_from_file('./random_number_component.yaml')
  calcu_number_op = load_component_from_file('./calcu_number_component.yaml')
  ```
* 建立 pipeline

  ```
  def sum_random_number():
      # 建立 component
      random_number_task = random_number_op(count=10, output_path='/blobs.txt')
      # 取得 component 的 output，outputs['show_number'] 的 show_number 來自上面 yaml 檔的 outputs name 的 Show Number，kubeflow 會自動變為小寫，並把空格換成_(底線)
      inputPath = random_number_task.outputs['show_number']
      print(inputPath)
      calcu_number_task = calcu_number_op(input_path=inputPath, output_path='/blobs.txt')
    
  kfp.Client().create_run_from_pipeline_func(sum_random_number, arguments={})
  ```

## 建立可重複使用的 pipeline

* 上面的方式只能在程式內執行 pipeline，如果要在 kubeflow UI 上重複使用，必須匯出pipeline的壓縮檔

  ```
  import kfp
  # 透過 kfp.dsl.pipeline 定義 pipeline
  @kfp.dsl.pipeline(
    name='random_number',
    description='Random number and calcu sum'
  )

  # 定義 pipeline 所組成的 component，這裡的 count='Enter count of number'，就會在 kubeflow pipeline UI 上看到的 Input 變數
  def sum_random_number(count='Enter count of number'):
      random_number_task = random_number_op(count)
      inputPath = random_number_task.outputs['show_number']
      calcu_number_task = calcu_number_op(input_path=inputPath)

  # 將pipeline 匯出成壓縮檔，再從 kubeflow UI 上傳，即可重複使用    
  import kfp.compiler as compiler
  import datetime
  # Compile the pipeline to a file.
  filename = 'random_number{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(
      dt=datetime.datetime.now())
  compiler.Compiler().compile(sum_random_number, filename)
  ```
* 上傳後，可以建立此 pipeline 的 run ，可以看到有 input 的欄位，就是上面的 count='Enter count of number'，這裡就可以設計成輸入 dataset

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-6e0efbd7f03eedeca252fa04ba1040348b55aa00%2Fpipeline-run-input.jpg?alt=media)

## Kubeflow Pipeline Metadata 可視化 <a href="#kubeflowpipelinemetadata-ke-shi-hua" id="kubeflowpipelinemetadata-ke-shi-hua"></a>

* pipeline 的 component 除了傳遞 metadata 之外，也可以將 metadata 輸出成可視化的狀態，方便觀察
* 目前支援的型態有:
  1. table (用於 csv 檔)
  2. tensorboard 格式
  3. confusion matrix (實際上是 csv)
  4. markdown 文件
  5. html 網頁
  6. ROC curve (實際上是 csv)
* 以上型態目前支援使用 local minio S3的有:
  1. csv 檔
  2. markdown 文件
  3. html 網頁
  4. ROC curve
* 其餘的支援 GCS

### 使用方式 <a href="#shi-yong-fang-shi" id="shi-yong-fang-shi"></a>

* 在 container 內部寫入 /mlpipeline-ui-metadata.json 檔案
* 根據不同的形態，寫入的內容有所不同

### Table

* 必填欄位:
  * format
  * header
  * source
* source: 如果來源是 GCS 請填 gs\://，來源是 kubeflow 內部的 minio，請填 minio://
* 如果來源是 minio，請把檔案壓縮成 tar 再上傳至 minio，如果是 gsc，就不用壓縮

  ```
  metadata = {
    'outputs' : [{
      'type': 'table',
      'format': 'csv',
      'header': ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species',],
      'source': 'minio://test/iris.tar'
    }]
  }

  with open('/mlpipeline-ui-metadata.json', 'w') as f:
      json.dump(metadata, f)
  ```
* 結果

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-06da58800b9dd48e2dd21ea385c700bda976b56e%2Ftable.jpg?alt=media)

### Tensorboard

* 必填欄位:
  * source
* 目前只支援 gcs

  ```
  metadata = {
        'outputs' : [{
          'type': 'tensorboard',
          'source': 'gs://ml-pipeline-playground/mnist',
        }]
      }

  with open('/mlpipeline-ui-metadata.json', 'w') as f:
      json.dump(metadata, f)
  ```
* 結果 [![](http://10.50.0.12/wiki/lib/exe/fetch.php?w=600\&tok=faae13\&media=project:openstack:container:board1.jpg)](http://10.50.0.12/wiki/lib/exe/detail.php?id=project%3Aopenstack%3Acontainer%3Akubeflow_pipelines\&media=project:openstack:container:board1.jpg)

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-c02145dca3f84301de7a5c292a4e3e4c2c18ba10%2Fboard1.jpg?alt=media)

### Confusion matrix

* 必填欄位
  * format
  * labels
  * schema
  * source
* 使用schema解析數據
* labels 為x和y軸

  ```
   metadata = {
      'outputs' : [{
        'type': 'confusion_matrix',
        'format': 'csv',
        'schema': [
          {'name': 'target', 'type': 'CATEGORY'},
          {'name': 'predicted', 'type': 'CATEGORY'},
          {'name': 'count', 'type': 'NUMBER'},
        ],
        'source': <CONFUSION_MATRIX_CSV_FILE>,
        # Convert vocab to string because for bealean values we want "True|False" to match csv data.
        'labels': list(map(str, vocab)),
      }]
    }

  with open('/mlpipeline-ui-metadata.json', 'w') as f:
      json.dump(metadata, f)
  ```
* 結果

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-50cb2416a2e98876087d938f161d5bd43103e0f7%2Ftaxi-tip-confusion-matrix-step-output.png?alt=media)

### Markdown

* 必填欄位:
  * source
* storage 為選填，若填入 inline 的值，source 的內容可以直接為　markdown 語法
* source 為 minio，請將檔案壓縮成 tar檔

  ```
  metadata = {
      'outputs' : [
      # Markdown that is hardcoded inline
      {
        'storage': 'inline',
        'source': '# Inline Markdown\n[A link](https://www.kubeflow.org/)',
        'type': 'markdown',
      },
      # Markdown that is read from a file
      {
        'source': 'minio://test/testmarkdown.tar',
        'type': 'markdown',
      }]
  }
  with open('/mlpipeline-ui-metadata.json', 'w') as f:
      json.dump(metadata, f)
  ```
* 結果

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-65143fe5847ef6c98cd1aece25655cf7df1ada2f%2Fmark.jpg?alt=media)

### Html

* 必填欄位:
  * source
* storage 為選填，若填入 inline 的值，source 的內容可以直接為　html 語法
* source 為 minio，請將檔案壓縮成 tar檔

  ```
  metadata = {
      'outputs': [{
        'type': 'web-app',
        'source': 'minio://test/hello.tar',
      }]
  }
  ```
* 結果

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-0cca06edc9e23339bbc2133d043e8b33b7c899a8%2Fhtml.jpg?alt=media)

### ROC

* 必填欄位:
  * format
  * schema
  * source
* source 為 minio，請將檔案壓縮成 tar檔

  ```
  metadata = {
      'outputs': [{
        'type': 'roc',
        'format': 'csv',
        'schema': [
          {'name': 'fpr', 'type': 'NUMBER'},
          {'name': 'tpr', 'type': 'NUMBER'},
          {'name': 'thresholds', 'type': 'NUMBER'},
        ],
        'source': 'minio://test/roc.tar'
      }]
  }
  ```
* 結果

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-aa552b5ad397134fa777e37e06dbdfe1dca3640d%2Froc.jpg?alt=media)

## 指定 output

* 除了在 container 內部寫入mlpipeline-ui-metadata.json 之外， 還需要在 component 定義 output
* 如果使用 kfp.dsl.ContainerOp，請加入output\_artifact\_paths參數，並指定值為\
  {'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'}

  ```
  def list_gcs_blobs_op(name, bucket):
    return kfp.dsl.ContainerOp(
        name=name,
        image='iservcloud/list-minio:latest',
        command=['python', '/app/app.py'],
        arguments=['--bucket', bucket],
        file_outputs={'blobs': '/blobs.txt'},
        output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'},
    )
  ```
* 如果是用 load yaml 檔的話，請在 yaml 檔指定以下
* outputs，名為 MLPipeline UI metadata ， type 為 UI metadata
* fileOutputs，值為 MLPipeline UI metadata: /mlpipeline-ui-metadata.json

  ```
  name: random_number
  description: Random count of 0~100

  inputs:
    - name: Count
      type: int

  outputs:
    - name: Show Number
    - name: MLPipeline UI metadata
      type: UI metadata

  implementation:
    container:
      image: iservcloud/random_number:latest
      command: [python, /app/app.py]
      args: [
          --count, {inputValue: Count},
          --output_path, {outputPath: Show Number}
      ]
      fileOutputs:
         MLPipeline UI metadata: /mlpipeline-ui-metadata.json
  ```

## 監控輸出值 <a href="#jian-kong-shu-chu-zhi" id="jian-kong-shu-chu-zhi"></a>

* 除了 metadata 之外，也可以輸出值，像是準確率等等，以方便觀察
* 將內容寫入至 /mlpipeline-metrics.json
* 內容如下

  ```
  metrics = {
      'metrics': [{
        # 監測的名稱，須符合正規式 ^[a-z]([-a-z0-9]{0,62}[a-z0-9])?$
        'name': 'accuracy-score', 
        # 監測的值，須為數字，可以用變數帶入
        'numberValue':  accuracy, 
        # format 為可選， 值只支援 PERCENTAGE 和 RAW
        'format': "PERCENTAGE",
      }]
    }
  ```
* 寫入 container 以外，也要指定output
* 和上面的 指定 output 段落一樣，只是依據使用方式將內容改成
* 使用 kfp.dsl.ContainerOp ，output\_artifact\_paths 的值為 'mlpipeline-metrics': '/mlpipeline-metrics.json'

  ```
  output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json', 'mlpipeline-metrics': '/mlpipeline-metrics.json'}
  ```
* 使用 load yaml 修改以下
* outputs，名為 MLPipeline Metrics ， type 為 Metrics
* fileOutputs，值為 MLPipeline Metrics: /mlpipeline-metrics.json

  ```
  name: random_number
  description: Random count of 0~100

  inputs:
    - name: Count
      type: int

  outputs:
    - name: Show Number
    - name: MLPipeline UI metadata
      type: UI metadata
    - name: MLPipeline Metrics
      type: Metrics
  implementation:
    container:
      image: iservcloud/random_number:latest
      command: [python, /app/app.py]
      args: [
          --count, {inputValue: Count},
          --output_path, {outputPath: Show Number}
      ]
      fileOutputs:
         MLPipeline UI metadata: /mlpipeline-ui-metadata.json
         MLPipeline Metrics:     /mlpipeline-metrics.json
  ```
* 結果 根據 key 的值和數量不同，會有變化

![](https://1123223527-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_BW8-pFxbu7PCB6NXf%2Fuploads%2Fgit-blob-d4d842163161dddadb1916c2d3aa6f87efb95366%2Fmetric.jpg?alt=media)
