mirror of
https://github.com/ghndrx/kubeflow-pipelines.git
synced 2026-02-10 06:45:13 +00:00
Fix MinIO endpoint to use internal cluster service
This commit is contained in:
371
ddi_training_runpod.yaml
Normal file
371
ddi_training_runpod.yaml
Normal file
@@ -0,0 +1,371 @@
|
|||||||
|
# PIPELINE DEFINITION
|
||||||
|
# Name: ddi-training-runpod
|
||||||
|
# Description: Train DDI detection model using RunPod serverless GPU
|
||||||
|
# Inputs:
|
||||||
|
# epochs: int [Default: 3.0]
|
||||||
|
# learning_rate: float [Default: 2e-05]
|
||||||
|
# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000']
|
||||||
|
# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT']
|
||||||
|
# model_version: str [Default: 'v1']
|
||||||
|
# runpod_endpoint_id: str [Default: 'YOUR_ENDPOINT_ID']
|
||||||
|
components:
|
||||||
|
comp-create-sample-dataset:
|
||||||
|
executorLabel: exec-create-sample-dataset
|
||||||
|
inputDefinitions:
|
||||||
|
parameters:
|
||||||
|
minio_access_key:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_endpoint:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_secret_key:
|
||||||
|
parameterType: STRING
|
||||||
|
output_path:
|
||||||
|
defaultValue: ddi_train.json
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
outputDefinitions:
|
||||||
|
parameters:
|
||||||
|
Output:
|
||||||
|
parameterType: STRING
|
||||||
|
comp-register-model:
|
||||||
|
executorLabel: exec-register-model
|
||||||
|
inputDefinitions:
|
||||||
|
parameters:
|
||||||
|
minio_access_key:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_endpoint:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_secret_key:
|
||||||
|
parameterType: STRING
|
||||||
|
model_name:
|
||||||
|
defaultValue: ddi-detector
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
model_path:
|
||||||
|
parameterType: STRING
|
||||||
|
version:
|
||||||
|
defaultValue: v1
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
outputDefinitions:
|
||||||
|
parameters:
|
||||||
|
Output:
|
||||||
|
parameterType: STRING
|
||||||
|
comp-trigger-runpod-training:
|
||||||
|
executorLabel: exec-trigger-runpod-training
|
||||||
|
inputDefinitions:
|
||||||
|
parameters:
|
||||||
|
dataset_path:
|
||||||
|
parameterType: STRING
|
||||||
|
epochs:
|
||||||
|
defaultValue: 3.0
|
||||||
|
isOptional: true
|
||||||
|
parameterType: NUMBER_INTEGER
|
||||||
|
learning_rate:
|
||||||
|
defaultValue: 2.0e-05
|
||||||
|
isOptional: true
|
||||||
|
parameterType: NUMBER_DOUBLE
|
||||||
|
minio_access_key:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_endpoint:
|
||||||
|
parameterType: STRING
|
||||||
|
minio_secret_key:
|
||||||
|
parameterType: STRING
|
||||||
|
model_name:
|
||||||
|
defaultValue: emilyalsentzer/Bio_ClinicalBERT
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
output_model_path:
|
||||||
|
defaultValue: ddi_model_v1
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
runpod_api_key:
|
||||||
|
parameterType: STRING
|
||||||
|
runpod_endpoint_id:
|
||||||
|
parameterType: STRING
|
||||||
|
outputDefinitions:
|
||||||
|
parameters:
|
||||||
|
Output:
|
||||||
|
parameterType: STRING
|
||||||
|
deploymentSpec:
|
||||||
|
executors:
|
||||||
|
exec-create-sample-dataset:
|
||||||
|
container:
|
||||||
|
args:
|
||||||
|
- --executor_input
|
||||||
|
- '{{$}}'
|
||||||
|
- --function_to_execute
|
||||||
|
- create_sample_dataset
|
||||||
|
command:
|
||||||
|
- sh
|
||||||
|
- -c
|
||||||
|
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
|
||||||
|
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
|
||||||
|
\ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\
|
||||||
|
\ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
|
||||||
|
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
|
||||||
|
$0\" \"$@\"\n"
|
||||||
|
- sh
|
||||||
|
- -ec
|
||||||
|
- 'program_path=$(mktemp -d)
|
||||||
|
|
||||||
|
|
||||||
|
printf "%s" "$0" > "$program_path/ephemeral_component.py"
|
||||||
|
|
||||||
|
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
|
||||||
|
|
||||||
|
'
|
||||||
|
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
|
||||||
|
\ *\n\ndef create_sample_dataset(\n minio_endpoint: str,\n minio_access_key:\
|
||||||
|
\ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\
|
||||||
|
\n) -> str:\n \"\"\"Create a sample DDI training dataset for testing.\"\
|
||||||
|
\"\"\n import json\n import boto3\n\n # Sample DDI training data\
|
||||||
|
\ (drug pairs with interaction labels)\n # Labels: 0=none, 1=minor, 2=moderate,\
|
||||||
|
\ 3=major, 4=contraindicated\n sample_data = [\n {\"text\": \"\
|
||||||
|
Patient taking warfarin and aspirin together\", \"label\": 3},\n \
|
||||||
|
\ {\"text\": \"Metformin administered with lisinopril\", \"label\": 0},\n\
|
||||||
|
\ {\"text\": \"Concurrent use of simvastatin and amiodarone\", \"\
|
||||||
|
label\": 3},\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\
|
||||||
|
, \"label\": 2},\n {\"text\": \"Fluoxetine and tramadol co-administration\"\
|
||||||
|
, \"label\": 4},\n {\"text\": \"Atorvastatin given with diltiazem\"\
|
||||||
|
, \"label\": 2},\n {\"text\": \"Methotrexate and NSAIDs used together\"\
|
||||||
|
, \"label\": 3},\n {\"text\": \"Levothyroxine taken with calcium\
|
||||||
|
\ supplements\", \"label\": 1},\n {\"text\": \"Ciprofloxacin and\
|
||||||
|
\ theophylline interaction\", \"label\": 3},\n {\"text\": \"ACE inhibitor\
|
||||||
|
\ with potassium supplement\", \"label\": 2},\n # Add more samples\
|
||||||
|
\ for better training\n {\"text\": \"Digoxin and amiodarone combination\
|
||||||
|
\ therapy\", \"label\": 3},\n {\"text\": \"SSRIs with MAO inhibitors\"\
|
||||||
|
, \"label\": 4},\n {\"text\": \"Lithium and ACE inhibitors together\"\
|
||||||
|
, \"label\": 3},\n {\"text\": \"Benzodiazepines with opioids\", \"\
|
||||||
|
label\": 4},\n {\"text\": \"Metronidazole and alcohol consumption\"\
|
||||||
|
, \"label\": 4},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\
|
||||||
|
\ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\
|
||||||
|
\ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\
|
||||||
|
\ )\n\n data_json = json.dumps(sample_data)\n s3.put_object(\n\
|
||||||
|
\ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\
|
||||||
|
\ ContentType='application/json'\n )\n\n print(f\"Uploaded\
|
||||||
|
\ sample dataset to datasets/{output_path}\")\n return output_path\n\n"
|
||||||
|
image: python:3.11-slim
|
||||||
|
exec-register-model:
|
||||||
|
container:
|
||||||
|
args:
|
||||||
|
- --executor_input
|
||||||
|
- '{{$}}'
|
||||||
|
- --function_to_execute
|
||||||
|
- register_model
|
||||||
|
command:
|
||||||
|
- sh
|
||||||
|
- -c
|
||||||
|
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
|
||||||
|
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
|
||||||
|
\ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \
|
||||||
|
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
|
||||||
|
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
|
||||||
|
$0\" \"$@\"\n"
|
||||||
|
- sh
|
||||||
|
- -ec
|
||||||
|
- 'program_path=$(mktemp -d)
|
||||||
|
|
||||||
|
|
||||||
|
printf "%s" "$0" > "$program_path/ephemeral_component.py"
|
||||||
|
|
||||||
|
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
|
||||||
|
|
||||||
|
'
|
||||||
|
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
|
||||||
|
\ *\n\ndef register_model(\n model_path: str,\n minio_endpoint: str,\n\
|
||||||
|
\ minio_access_key: str,\n minio_secret_key: str,\n model_name:\
|
||||||
|
\ str = \"ddi-detector\",\n version: str = \"v1\"\n) -> str:\n \"\"\
|
||||||
|
\"Register the trained model in the model registry.\"\"\"\n import boto3\n\
|
||||||
|
\ import json\n from datetime import datetime\n\n s3 = boto3.client(\n\
|
||||||
|
\ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\
|
||||||
|
\ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\
|
||||||
|
\ )\n\n # Create model registry entry\n registry_entry = {\n \
|
||||||
|
\ \"name\": model_name,\n \"version\": version,\n \"\
|
||||||
|
path\": model_path,\n \"created_at\": datetime.utcnow().isoformat(),\n\
|
||||||
|
\ \"framework\": \"transformers\",\n \"task\": \"sequence-classification\"\
|
||||||
|
,\n \"labels\": [\"none\", \"minor\", \"moderate\", \"major\", \"\
|
||||||
|
contraindicated\"]\n }\n\n registry_key = f\"registry/{model_name}/{version}/metadata.json\"\
|
||||||
|
\n s3.put_object(\n Bucket='models',\n Key=registry_key,\n\
|
||||||
|
\ Body=json.dumps(registry_entry).encode('utf-8'),\n ContentType='application/json'\n\
|
||||||
|
\ )\n\n print(f\"Model registered: {model_name} v{version}\")\n \
|
||||||
|
\ print(f\"Registry path: models/{registry_key}\")\n\n return f\"models/{registry_key}\"\
|
||||||
|
\n\n"
|
||||||
|
image: python:3.11-slim
|
||||||
|
exec-trigger-runpod-training:
|
||||||
|
container:
|
||||||
|
args:
|
||||||
|
- --executor_input
|
||||||
|
- '{{$}}'
|
||||||
|
- --function_to_execute
|
||||||
|
- trigger_runpod_training
|
||||||
|
command:
|
||||||
|
- sh
|
||||||
|
- -c
|
||||||
|
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
|
||||||
|
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
|
||||||
|
\ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\
|
||||||
|
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
|
||||||
|
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
|
||||||
|
$0\" \"$@\"\n"
|
||||||
|
- sh
|
||||||
|
- -ec
|
||||||
|
- 'program_path=$(mktemp -d)
|
||||||
|
|
||||||
|
|
||||||
|
printf "%s" "$0" > "$program_path/ephemeral_component.py"
|
||||||
|
|
||||||
|
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
|
||||||
|
|
||||||
|
'
|
||||||
|
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
|
||||||
|
\ *\n\ndef trigger_runpod_training(\n runpod_api_key: str,\n runpod_endpoint_id:\
|
||||||
|
\ str,\n minio_endpoint: str,\n minio_access_key: str,\n minio_secret_key:\
|
||||||
|
\ str,\n dataset_path: str,\n model_name: str = \"emilyalsentzer/Bio_ClinicalBERT\"\
|
||||||
|
,\n epochs: int = 3,\n learning_rate: float = 2e-5,\n output_model_path:\
|
||||||
|
\ str = \"ddi_model_v1\"\n) -> str:\n \"\"\"Trigger RunPod serverless\
|
||||||
|
\ training job.\"\"\"\n import requests\n import json\n import\
|
||||||
|
\ time\n\n # RunPod API endpoint\n url = f\"https://api.runpod.ai/v2/{runpod_endpoint_id}/runsync\"\
|
||||||
|
\n\n headers = {\n \"Authorization\": f\"Bearer {runpod_api_key}\"\
|
||||||
|
,\n \"Content-Type\": \"application/json\"\n }\n\n payload\
|
||||||
|
\ = {\n \"input\": {\n \"model_name\": model_name,\n \
|
||||||
|
\ \"dataset_path\": dataset_path,\n \"epochs\": epochs,\n\
|
||||||
|
\ \"learning_rate\": learning_rate,\n \"batch_size\"\
|
||||||
|
: 16,\n \"output_path\": output_model_path,\n # MinIO\
|
||||||
|
\ credentials for the worker\n \"minio_endpoint\": minio_endpoint,\n\
|
||||||
|
\ \"minio_access_key\": minio_access_key,\n \"minio_secret_key\"\
|
||||||
|
: minio_secret_key\n }\n }\n\n print(f\"Triggering RunPod training\
|
||||||
|
\ job...\")\n print(f\"Model: {model_name}\")\n print(f\"Dataset:\
|
||||||
|
\ {dataset_path}\")\n print(f\"Epochs: {epochs}\")\n\n response =\
|
||||||
|
\ requests.post(url, headers=headers, json=payload, timeout=3600)\n result\
|
||||||
|
\ = response.json()\n\n if response.status_code != 200:\n raise\
|
||||||
|
\ Exception(f\"RunPod API error: {result}\")\n\n if result.get('status')\
|
||||||
|
\ == 'FAILED':\n raise Exception(f\"Training failed: {result.get('error')}\"\
|
||||||
|
)\n\n output = result.get('output', {})\n print(f\"Training complete!\"\
|
||||||
|
)\n print(f\"Model path: {output.get('model_path')}\")\n print(f\"\
|
||||||
|
Metrics: {output.get('metrics')}\")\n\n return output.get('model_path',\
|
||||||
|
\ f\"s3://models/{output_model_path}\")\n\n"
|
||||||
|
image: python:3.11-slim
|
||||||
|
pipelineInfo:
|
||||||
|
description: Train DDI detection model using RunPod serverless GPU
|
||||||
|
name: ddi-training-runpod
|
||||||
|
root:
|
||||||
|
dag:
|
||||||
|
tasks:
|
||||||
|
create-sample-dataset:
|
||||||
|
cachingOptions:
|
||||||
|
enableCache: true
|
||||||
|
componentRef:
|
||||||
|
name: comp-create-sample-dataset
|
||||||
|
inputs:
|
||||||
|
parameters:
|
||||||
|
minio_access_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin
|
||||||
|
minio_endpoint:
|
||||||
|
componentInputParameter: minio_endpoint
|
||||||
|
minio_secret_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin123!
|
||||||
|
output_path:
|
||||||
|
runtimeValue:
|
||||||
|
constant: ddi_train_{{$.inputs.parameters['pipelinechannel--model_version']}}.json
|
||||||
|
pipelinechannel--model_version:
|
||||||
|
componentInputParameter: model_version
|
||||||
|
taskInfo:
|
||||||
|
name: create-sample-dataset
|
||||||
|
register-model:
|
||||||
|
cachingOptions:
|
||||||
|
enableCache: true
|
||||||
|
componentRef:
|
||||||
|
name: comp-register-model
|
||||||
|
dependentTasks:
|
||||||
|
- trigger-runpod-training
|
||||||
|
inputs:
|
||||||
|
parameters:
|
||||||
|
minio_access_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin
|
||||||
|
minio_endpoint:
|
||||||
|
componentInputParameter: minio_endpoint
|
||||||
|
minio_secret_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin123!
|
||||||
|
model_name:
|
||||||
|
runtimeValue:
|
||||||
|
constant: ddi-detector
|
||||||
|
model_path:
|
||||||
|
taskOutputParameter:
|
||||||
|
outputParameterKey: Output
|
||||||
|
producerTask: trigger-runpod-training
|
||||||
|
version:
|
||||||
|
componentInputParameter: model_version
|
||||||
|
taskInfo:
|
||||||
|
name: register-model
|
||||||
|
trigger-runpod-training:
|
||||||
|
cachingOptions:
|
||||||
|
enableCache: true
|
||||||
|
componentRef:
|
||||||
|
name: comp-trigger-runpod-training
|
||||||
|
dependentTasks:
|
||||||
|
- create-sample-dataset
|
||||||
|
inputs:
|
||||||
|
parameters:
|
||||||
|
dataset_path:
|
||||||
|
taskOutputParameter:
|
||||||
|
outputParameterKey: Output
|
||||||
|
producerTask: create-sample-dataset
|
||||||
|
epochs:
|
||||||
|
componentInputParameter: epochs
|
||||||
|
learning_rate:
|
||||||
|
componentInputParameter: learning_rate
|
||||||
|
minio_access_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin
|
||||||
|
minio_endpoint:
|
||||||
|
componentInputParameter: minio_endpoint
|
||||||
|
minio_secret_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: minioadmin123!
|
||||||
|
model_name:
|
||||||
|
componentInputParameter: model_name
|
||||||
|
output_model_path:
|
||||||
|
runtimeValue:
|
||||||
|
constant: ddi_model_{{$.inputs.parameters['pipelinechannel--model_version']}}
|
||||||
|
pipelinechannel--model_version:
|
||||||
|
componentInputParameter: model_version
|
||||||
|
runpod_api_key:
|
||||||
|
runtimeValue:
|
||||||
|
constant: ''
|
||||||
|
runpod_endpoint_id:
|
||||||
|
componentInputParameter: runpod_endpoint_id
|
||||||
|
taskInfo:
|
||||||
|
name: trigger-runpod-training
|
||||||
|
inputDefinitions:
|
||||||
|
parameters:
|
||||||
|
epochs:
|
||||||
|
defaultValue: 3.0
|
||||||
|
isOptional: true
|
||||||
|
parameterType: NUMBER_INTEGER
|
||||||
|
learning_rate:
|
||||||
|
defaultValue: 2.0e-05
|
||||||
|
isOptional: true
|
||||||
|
parameterType: NUMBER_DOUBLE
|
||||||
|
minio_endpoint:
|
||||||
|
defaultValue: http://minio.minio.svc.cluster.local:9000
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
model_name:
|
||||||
|
defaultValue: emilyalsentzer/Bio_ClinicalBERT
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
model_version:
|
||||||
|
defaultValue: v1
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
runpod_endpoint_id:
|
||||||
|
defaultValue: YOUR_ENDPOINT_ID
|
||||||
|
isOptional: true
|
||||||
|
parameterType: STRING
|
||||||
|
schemaVersion: 2.1.0
|
||||||
|
sdkVersion: kfp-2.15.2
|
||||||
@@ -198,8 +198,8 @@ def ddi_training_pipeline(
|
|||||||
learning_rate: float = 2e-5,
|
learning_rate: float = 2e-5,
|
||||||
model_version: str = "v1",
|
model_version: str = "v1",
|
||||||
|
|
||||||
# MinIO settings (these will be injected from secrets)
|
# MinIO settings - use internal cluster service URL
|
||||||
minio_endpoint: str = "https://minio.walleye-frog.ts.net",
|
minio_endpoint: str = "http://minio.minio.svc.cluster.local:9000",
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Full DDI training pipeline:
|
Full DDI training pipeline:
|
||||||
|
|||||||
Reference in New Issue
Block a user