# PIPELINE DEFINITION # Name: ddi-training-runpod # Description: Train DDI detection model using RunPod serverless GPU # Inputs: # epochs: int [Default: 3.0] # learning_rate: float [Default: 2e-05] # minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] # model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] # model_version: str [Default: 'v1'] # runpod_endpoint_id: str [Default: 'YOUR_ENDPOINT_ID'] components: comp-create-sample-dataset: executorLabel: exec-create-sample-dataset inputDefinitions: parameters: minio_access_key: parameterType: STRING minio_endpoint: parameterType: STRING minio_secret_key: parameterType: STRING output_path: defaultValue: ddi_train.json isOptional: true parameterType: STRING outputDefinitions: parameters: Output: parameterType: STRING comp-register-model: executorLabel: exec-register-model inputDefinitions: parameters: minio_access_key: parameterType: STRING minio_endpoint: parameterType: STRING minio_secret_key: parameterType: STRING model_name: defaultValue: ddi-detector isOptional: true parameterType: STRING model_path: parameterType: STRING version: defaultValue: v1 isOptional: true parameterType: STRING outputDefinitions: parameters: Output: parameterType: STRING comp-trigger-runpod-training: executorLabel: exec-trigger-runpod-training inputDefinitions: parameters: dataset_path: parameterType: STRING epochs: defaultValue: 3.0 isOptional: true parameterType: NUMBER_INTEGER learning_rate: defaultValue: 2.0e-05 isOptional: true parameterType: NUMBER_DOUBLE minio_access_key: parameterType: STRING minio_endpoint: parameterType: STRING minio_secret_key: parameterType: STRING model_name: defaultValue: emilyalsentzer/Bio_ClinicalBERT isOptional: true parameterType: STRING output_model_path: defaultValue: ddi_model_v1 isOptional: true parameterType: STRING runpod_api_key: parameterType: STRING runpod_endpoint_id: parameterType: STRING outputDefinitions: parameters: Output: parameterType: STRING deploymentSpec: executors: exec-create-sample-dataset: container: args: - --executor_input - '{{$}}' - --function_to_execute - create_sample_dataset command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef create_sample_dataset(\n minio_endpoint: str,\n minio_access_key:\ \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ \n) -> str:\n \"\"\"Create a sample DDI training dataset for testing.\"\ \"\"\n import json\n import boto3\n\n # Sample DDI training data\ \ (drug pairs with interaction labels)\n # Labels: 0=none, 1=minor, 2=moderate,\ \ 3=major, 4=contraindicated\n sample_data = [\n {\"text\": \"\ Patient taking warfarin and aspirin together\", \"label\": 3},\n \ \ {\"text\": \"Metformin administered with lisinopril\", \"label\": 0},\n\ \ {\"text\": \"Concurrent use of simvastatin and amiodarone\", \"\ label\": 3},\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ , \"label\": 2},\n {\"text\": \"Fluoxetine and tramadol co-administration\"\ , \"label\": 4},\n {\"text\": \"Atorvastatin given with diltiazem\"\ , \"label\": 2},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ , \"label\": 3},\n {\"text\": \"Levothyroxine taken with calcium\ \ supplements\", \"label\": 1},\n {\"text\": \"Ciprofloxacin and\ \ theophylline interaction\", \"label\": 3},\n {\"text\": \"ACE inhibitor\ \ with potassium supplement\", \"label\": 2},\n # Add more samples\ \ for better training\n {\"text\": \"Digoxin and amiodarone combination\ \ therapy\", \"label\": 3},\n {\"text\": \"SSRIs with MAO inhibitors\"\ , \"label\": 4},\n {\"text\": \"Lithium and ACE inhibitors together\"\ , \"label\": 3},\n {\"text\": \"Benzodiazepines with opioids\", \"\ label\": 4},\n {\"text\": \"Metronidazole and alcohol consumption\"\ , \"label\": 4},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ \ )\n\n data_json = json.dumps(sample_data)\n s3.put_object(\n\ \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ \ ContentType='application/json'\n )\n\n print(f\"Uploaded\ \ sample dataset to datasets/{output_path}\")\n return output_path\n\n" image: python:3.11-slim exec-register-model: container: args: - --executor_input - '{{$}}' - --function_to_execute - register_model command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef register_model(\n model_path: str,\n minio_endpoint: str,\n\ \ minio_access_key: str,\n minio_secret_key: str,\n model_name:\ \ str = \"ddi-detector\",\n version: str = \"v1\"\n) -> str:\n \"\"\ \"Register the trained model in the model registry.\"\"\"\n import boto3\n\ \ import json\n from datetime import datetime\n\n s3 = boto3.client(\n\ \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ \ )\n\n # Create model registry entry\n registry_entry = {\n \ \ \"name\": model_name,\n \"version\": version,\n \"\ path\": model_path,\n \"created_at\": datetime.utcnow().isoformat(),\n\ \ \"framework\": \"transformers\",\n \"task\": \"sequence-classification\"\ ,\n \"labels\": [\"none\", \"minor\", \"moderate\", \"major\", \"\ contraindicated\"]\n }\n\n registry_key = f\"registry/{model_name}/{version}/metadata.json\"\ \n s3.put_object(\n Bucket='models',\n Key=registry_key,\n\ \ Body=json.dumps(registry_entry).encode('utf-8'),\n ContentType='application/json'\n\ \ )\n\n print(f\"Model registered: {model_name} v{version}\")\n \ \ print(f\"Registry path: models/{registry_key}\")\n\n return f\"models/{registry_key}\"\ \n\n" image: python:3.11-slim exec-trigger-runpod-training: container: args: - --executor_input - '{{$}}' - --function_to_execute - trigger_runpod_training command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) printf "%s" "$0" > "$program_path/ephemeral_component.py" _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef trigger_runpod_training(\n runpod_api_key: str,\n runpod_endpoint_id:\ \ str,\n minio_endpoint: str,\n minio_access_key: str,\n minio_secret_key:\ \ str,\n dataset_path: str,\n model_name: str = \"emilyalsentzer/Bio_ClinicalBERT\"\ ,\n epochs: int = 3,\n learning_rate: float = 2e-5,\n output_model_path:\ \ str = \"ddi_model_v1\"\n) -> str:\n \"\"\"Trigger RunPod serverless\ \ training job.\"\"\"\n import requests\n import json\n import\ \ time\n\n # RunPod API endpoint\n url = f\"https://api.runpod.ai/v2/{runpod_endpoint_id}/runsync\"\ \n\n headers = {\n \"Authorization\": f\"Bearer {runpod_api_key}\"\ ,\n \"Content-Type\": \"application/json\"\n }\n\n payload\ \ = {\n \"input\": {\n \"model_name\": model_name,\n \ \ \"dataset_path\": dataset_path,\n \"epochs\": epochs,\n\ \ \"learning_rate\": learning_rate,\n \"batch_size\"\ : 16,\n \"output_path\": output_model_path,\n # MinIO\ \ credentials for the worker\n \"minio_endpoint\": minio_endpoint,\n\ \ \"minio_access_key\": minio_access_key,\n \"minio_secret_key\"\ : minio_secret_key\n }\n }\n\n print(f\"Triggering RunPod training\ \ job...\")\n print(f\"Model: {model_name}\")\n print(f\"Dataset:\ \ {dataset_path}\")\n print(f\"Epochs: {epochs}\")\n\n response =\ \ requests.post(url, headers=headers, json=payload, timeout=3600)\n result\ \ = response.json()\n\n if response.status_code != 200:\n raise\ \ Exception(f\"RunPod API error: {result}\")\n\n if result.get('status')\ \ == 'FAILED':\n raise Exception(f\"Training failed: {result.get('error')}\"\ )\n\n output = result.get('output', {})\n print(f\"Training complete!\"\ )\n print(f\"Model path: {output.get('model_path')}\")\n print(f\"\ Metrics: {output.get('metrics')}\")\n\n return output.get('model_path',\ \ f\"s3://models/{output_model_path}\")\n\n" image: python:3.11-slim pipelineInfo: description: Train DDI detection model using RunPod serverless GPU name: ddi-training-runpod root: dag: tasks: create-sample-dataset: cachingOptions: enableCache: true componentRef: name: comp-create-sample-dataset inputs: parameters: minio_access_key: runtimeValue: constant: minioadmin minio_endpoint: componentInputParameter: minio_endpoint minio_secret_key: runtimeValue: constant: minioadmin123! output_path: runtimeValue: constant: ddi_train_{{$.inputs.parameters['pipelinechannel--model_version']}}.json pipelinechannel--model_version: componentInputParameter: model_version taskInfo: name: create-sample-dataset register-model: cachingOptions: enableCache: true componentRef: name: comp-register-model dependentTasks: - trigger-runpod-training inputs: parameters: minio_access_key: runtimeValue: constant: minioadmin minio_endpoint: componentInputParameter: minio_endpoint minio_secret_key: runtimeValue: constant: minioadmin123! model_name: runtimeValue: constant: ddi-detector model_path: taskOutputParameter: outputParameterKey: Output producerTask: trigger-runpod-training version: componentInputParameter: model_version taskInfo: name: register-model trigger-runpod-training: cachingOptions: enableCache: true componentRef: name: comp-trigger-runpod-training dependentTasks: - create-sample-dataset inputs: parameters: dataset_path: taskOutputParameter: outputParameterKey: Output producerTask: create-sample-dataset epochs: componentInputParameter: epochs learning_rate: componentInputParameter: learning_rate minio_access_key: runtimeValue: constant: minioadmin minio_endpoint: componentInputParameter: minio_endpoint minio_secret_key: runtimeValue: constant: minioadmin123! model_name: componentInputParameter: model_name output_model_path: runtimeValue: constant: ddi_model_{{$.inputs.parameters['pipelinechannel--model_version']}} pipelinechannel--model_version: componentInputParameter: model_version runpod_api_key: runtimeValue: constant: '' runpod_endpoint_id: componentInputParameter: runpod_endpoint_id taskInfo: name: trigger-runpod-training inputDefinitions: parameters: epochs: defaultValue: 3.0 isOptional: true parameterType: NUMBER_INTEGER learning_rate: defaultValue: 2.0e-05 isOptional: true parameterType: NUMBER_DOUBLE minio_endpoint: defaultValue: http://minio.minio.svc.cluster.local:9000 isOptional: true parameterType: STRING model_name: defaultValue: emilyalsentzer/Bio_ClinicalBERT isOptional: true parameterType: STRING model_version: defaultValue: v1 isOptional: true parameterType: STRING runpod_endpoint_id: defaultValue: YOUR_ENDPOINT_ID isOptional: true parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.15.2