diff --git a/ddi_training_runpod.yaml b/ddi_training_runpod.yaml new file mode 100644 index 0000000..816e228 --- /dev/null +++ b/ddi_training_runpod.yaml @@ -0,0 +1,371 @@ +# PIPELINE DEFINITION +# Name: ddi-training-runpod +# Description: Train DDI detection model using RunPod serverless GPU +# Inputs: +# epochs: int [Default: 3.0] +# learning_rate: float [Default: 2e-05] +# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] +# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] +# model_version: str [Default: 'v1'] +# runpod_endpoint_id: str [Default: 'YOUR_ENDPOINT_ID'] +components: + comp-create-sample-dataset: + executorLabel: exec-create-sample-dataset + inputDefinitions: + parameters: + minio_access_key: + parameterType: STRING + minio_endpoint: + parameterType: STRING + minio_secret_key: + parameterType: STRING + output_path: + defaultValue: ddi_train.json + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-register-model: + executorLabel: exec-register-model + inputDefinitions: + parameters: + minio_access_key: + parameterType: STRING + minio_endpoint: + parameterType: STRING + minio_secret_key: + parameterType: STRING + model_name: + defaultValue: ddi-detector + isOptional: true + parameterType: STRING + model_path: + parameterType: STRING + version: + defaultValue: v1 + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-trigger-runpod-training: + executorLabel: exec-trigger-runpod-training + inputDefinitions: + parameters: + dataset_path: + parameterType: STRING + epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 2.0e-05 + isOptional: true + parameterType: NUMBER_DOUBLE + minio_access_key: + parameterType: STRING + minio_endpoint: + parameterType: STRING + minio_secret_key: + parameterType: STRING + model_name: + defaultValue: emilyalsentzer/Bio_ClinicalBERT + isOptional: true + parameterType: STRING + output_model_path: + defaultValue: ddi_model_v1 + isOptional: true + parameterType: STRING + runpod_api_key: + parameterType: STRING + runpod_endpoint_id: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-create-sample-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_sample_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ + \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_sample_dataset(\n minio_endpoint: str,\n minio_access_key:\ + \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ + \n) -> str:\n \"\"\"Create a sample DDI training dataset for testing.\"\ + \"\"\n import json\n import boto3\n\n # Sample DDI training data\ + \ (drug pairs with interaction labels)\n # Labels: 0=none, 1=minor, 2=moderate,\ + \ 3=major, 4=contraindicated\n sample_data = [\n {\"text\": \"\ + Patient taking warfarin and aspirin together\", \"label\": 3},\n \ + \ {\"text\": \"Metformin administered with lisinopril\", \"label\": 0},\n\ + \ {\"text\": \"Concurrent use of simvastatin and amiodarone\", \"\ + label\": 3},\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ + , \"label\": 2},\n {\"text\": \"Fluoxetine and tramadol co-administration\"\ + , \"label\": 4},\n {\"text\": \"Atorvastatin given with diltiazem\"\ + , \"label\": 2},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ + , \"label\": 3},\n {\"text\": \"Levothyroxine taken with calcium\ + \ supplements\", \"label\": 1},\n {\"text\": \"Ciprofloxacin and\ + \ theophylline interaction\", \"label\": 3},\n {\"text\": \"ACE inhibitor\ + \ with potassium supplement\", \"label\": 2},\n # Add more samples\ + \ for better training\n {\"text\": \"Digoxin and amiodarone combination\ + \ therapy\", \"label\": 3},\n {\"text\": \"SSRIs with MAO inhibitors\"\ + , \"label\": 4},\n {\"text\": \"Lithium and ACE inhibitors together\"\ + , \"label\": 3},\n {\"text\": \"Benzodiazepines with opioids\", \"\ + label\": 4},\n {\"text\": \"Metronidazole and alcohol consumption\"\ + , \"label\": 4},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ + \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ + \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ + \ )\n\n data_json = json.dumps(sample_data)\n s3.put_object(\n\ + \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ + \ ContentType='application/json'\n )\n\n print(f\"Uploaded\ + \ sample dataset to datasets/{output_path}\")\n return output_path\n\n" + image: python:3.11-slim + exec-register-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - register_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef register_model(\n model_path: str,\n minio_endpoint: str,\n\ + \ minio_access_key: str,\n minio_secret_key: str,\n model_name:\ + \ str = \"ddi-detector\",\n version: str = \"v1\"\n) -> str:\n \"\"\ + \"Register the trained model in the model registry.\"\"\"\n import boto3\n\ + \ import json\n from datetime import datetime\n\n s3 = boto3.client(\n\ + \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ + \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ + \ )\n\n # Create model registry entry\n registry_entry = {\n \ + \ \"name\": model_name,\n \"version\": version,\n \"\ + path\": model_path,\n \"created_at\": datetime.utcnow().isoformat(),\n\ + \ \"framework\": \"transformers\",\n \"task\": \"sequence-classification\"\ + ,\n \"labels\": [\"none\", \"minor\", \"moderate\", \"major\", \"\ + contraindicated\"]\n }\n\n registry_key = f\"registry/{model_name}/{version}/metadata.json\"\ + \n s3.put_object(\n Bucket='models',\n Key=registry_key,\n\ + \ Body=json.dumps(registry_entry).encode('utf-8'),\n ContentType='application/json'\n\ + \ )\n\n print(f\"Model registered: {model_name} v{version}\")\n \ + \ print(f\"Registry path: models/{registry_key}\")\n\n return f\"models/{registry_key}\"\ + \n\n" + image: python:3.11-slim + exec-trigger-runpod-training: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - trigger_runpod_training + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef trigger_runpod_training(\n runpod_api_key: str,\n runpod_endpoint_id:\ + \ str,\n minio_endpoint: str,\n minio_access_key: str,\n minio_secret_key:\ + \ str,\n dataset_path: str,\n model_name: str = \"emilyalsentzer/Bio_ClinicalBERT\"\ + ,\n epochs: int = 3,\n learning_rate: float = 2e-5,\n output_model_path:\ + \ str = \"ddi_model_v1\"\n) -> str:\n \"\"\"Trigger RunPod serverless\ + \ training job.\"\"\"\n import requests\n import json\n import\ + \ time\n\n # RunPod API endpoint\n url = f\"https://api.runpod.ai/v2/{runpod_endpoint_id}/runsync\"\ + \n\n headers = {\n \"Authorization\": f\"Bearer {runpod_api_key}\"\ + ,\n \"Content-Type\": \"application/json\"\n }\n\n payload\ + \ = {\n \"input\": {\n \"model_name\": model_name,\n \ + \ \"dataset_path\": dataset_path,\n \"epochs\": epochs,\n\ + \ \"learning_rate\": learning_rate,\n \"batch_size\"\ + : 16,\n \"output_path\": output_model_path,\n # MinIO\ + \ credentials for the worker\n \"minio_endpoint\": minio_endpoint,\n\ + \ \"minio_access_key\": minio_access_key,\n \"minio_secret_key\"\ + : minio_secret_key\n }\n }\n\n print(f\"Triggering RunPod training\ + \ job...\")\n print(f\"Model: {model_name}\")\n print(f\"Dataset:\ + \ {dataset_path}\")\n print(f\"Epochs: {epochs}\")\n\n response =\ + \ requests.post(url, headers=headers, json=payload, timeout=3600)\n result\ + \ = response.json()\n\n if response.status_code != 200:\n raise\ + \ Exception(f\"RunPod API error: {result}\")\n\n if result.get('status')\ + \ == 'FAILED':\n raise Exception(f\"Training failed: {result.get('error')}\"\ + )\n\n output = result.get('output', {})\n print(f\"Training complete!\"\ + )\n print(f\"Model path: {output.get('model_path')}\")\n print(f\"\ + Metrics: {output.get('metrics')}\")\n\n return output.get('model_path',\ + \ f\"s3://models/{output_model_path}\")\n\n" + image: python:3.11-slim +pipelineInfo: + description: Train DDI detection model using RunPod serverless GPU + name: ddi-training-runpod +root: + dag: + tasks: + create-sample-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-sample-dataset + inputs: + parameters: + minio_access_key: + runtimeValue: + constant: minioadmin + minio_endpoint: + componentInputParameter: minio_endpoint + minio_secret_key: + runtimeValue: + constant: minioadmin123! + output_path: + runtimeValue: + constant: ddi_train_{{$.inputs.parameters['pipelinechannel--model_version']}}.json + pipelinechannel--model_version: + componentInputParameter: model_version + taskInfo: + name: create-sample-dataset + register-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-register-model + dependentTasks: + - trigger-runpod-training + inputs: + parameters: + minio_access_key: + runtimeValue: + constant: minioadmin + minio_endpoint: + componentInputParameter: minio_endpoint + minio_secret_key: + runtimeValue: + constant: minioadmin123! + model_name: + runtimeValue: + constant: ddi-detector + model_path: + taskOutputParameter: + outputParameterKey: Output + producerTask: trigger-runpod-training + version: + componentInputParameter: model_version + taskInfo: + name: register-model + trigger-runpod-training: + cachingOptions: + enableCache: true + componentRef: + name: comp-trigger-runpod-training + dependentTasks: + - create-sample-dataset + inputs: + parameters: + dataset_path: + taskOutputParameter: + outputParameterKey: Output + producerTask: create-sample-dataset + epochs: + componentInputParameter: epochs + learning_rate: + componentInputParameter: learning_rate + minio_access_key: + runtimeValue: + constant: minioadmin + minio_endpoint: + componentInputParameter: minio_endpoint + minio_secret_key: + runtimeValue: + constant: minioadmin123! + model_name: + componentInputParameter: model_name + output_model_path: + runtimeValue: + constant: ddi_model_{{$.inputs.parameters['pipelinechannel--model_version']}} + pipelinechannel--model_version: + componentInputParameter: model_version + runpod_api_key: + runtimeValue: + constant: '' + runpod_endpoint_id: + componentInputParameter: runpod_endpoint_id + taskInfo: + name: trigger-runpod-training + inputDefinitions: + parameters: + epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 2.0e-05 + isOptional: true + parameterType: NUMBER_DOUBLE + minio_endpoint: + defaultValue: http://minio.minio.svc.cluster.local:9000 + isOptional: true + parameterType: STRING + model_name: + defaultValue: emilyalsentzer/Bio_ClinicalBERT + isOptional: true + parameterType: STRING + model_version: + defaultValue: v1 + isOptional: true + parameterType: STRING + runpod_endpoint_id: + defaultValue: YOUR_ENDPOINT_ID + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.15.2 diff --git a/pipelines/ddi_training_runpod.py b/pipelines/ddi_training_runpod.py index b34742a..8535705 100644 --- a/pipelines/ddi_training_runpod.py +++ b/pipelines/ddi_training_runpod.py @@ -198,8 +198,8 @@ def ddi_training_pipeline( learning_rate: float = 2e-5, model_version: str = "v1", - # MinIO settings (these will be injected from secrets) - minio_endpoint: str = "https://minio.walleye-frog.ts.net", + # MinIO settings - use internal cluster service URL + minio_endpoint: str = "http://minio.minio.svc.cluster.local:9000", ): """ Full DDI training pipeline: