From 2e479fc61b7051aad9d2c759e55367fb34aaea5d Mon Sep 17 00:00:00 2001 From: Greg Hendrickson Date: Tue, 3 Feb 2026 16:11:40 +0000 Subject: [PATCH] chore: clean up repo structure - Remove compiled YAML files (can be regenerated) - Remove example pipelines - Remove unused med_rx_training.py - Update README with comprehensive docs - Clean up .gitignore --- .gitignore | 41 ++- README.md | 134 +++++++--- components/runpod_trainer/runpod.toml | 12 - ddi_data_prep.yaml | 269 ------------------- ddi_data_prep_ts.yaml | 265 ------------------ ddi_training_runpod.yaml | 371 -------------------------- hello_world.yaml | 128 --------- pipelines/examples/hello_world.py | 44 --- pipelines/med_rx_training.py | 202 -------------- 9 files changed, 120 insertions(+), 1346 deletions(-) delete mode 100644 components/runpod_trainer/runpod.toml delete mode 100644 ddi_data_prep.yaml delete mode 100644 ddi_data_prep_ts.yaml delete mode 100644 ddi_training_runpod.yaml delete mode 100644 hello_world.yaml delete mode 100644 pipelines/examples/hello_world.py delete mode 100644 pipelines/med_rx_training.py diff --git a/.gitignore b/.gitignore index b055cca..d33fd25 100644 --- a/.gitignore +++ b/.gitignore @@ -1,25 +1,12 @@ -# Credentials - NEVER commit these -*.env -.env* -secrets/ -*secret* -*credential* -*.pem -*.key - -# Compiled pipelines (regenerate from source) -*.yaml.compiled - # Python __pycache__/ *.py[cod] *$py.class -.Python *.so -.eggs/ -*.egg-info/ -dist/ -build/ +.Python +env/ +venv/ +.venv/ # IDE .idea/ @@ -27,10 +14,18 @@ build/ *.swp *.swo -# OS -.DS_Store -Thumbs.db +# Build +*.egg-info/ +dist/ +build/ -# Kubeflow artifacts (local only) -mlpipeline-ui-metadata.json -mlpipeline-metrics.json +# Compiled pipelines +*.yaml +!manifests/*.yaml + +# Local config +.env +*.local + +# RunPod +runpod.toml diff --git a/README.md b/README.md index 8ce26e9..28a346f 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,111 @@ -# Kubeflow Pipelines - GitOps Repository +# DDI Training Pipeline -This repository contains ML pipeline definitions managed via ArgoCD. +ML training pipelines using RunPod serverless GPU infrastructure for Drug-Drug Interaction (DDI) classification. -## Structure +## 🎯 Features -``` -. -├── pipelines/ # Pipeline Python definitions -│ └── examples/ # Example pipelines -├── components/ # Reusable pipeline components -├── experiments/ # Experiment configurations -├── runs/ # Scheduled/triggered runs -└── manifests/ # K8s manifests for ArgoCD +- **Bio_ClinicalBERT Classifier** - Fine-tuned on 176K real DrugBank DDI samples +- **RunPod Serverless** - Auto-scaling GPU workers (RTX 4090, A100, etc.) +- **S3 Model Storage** - Trained models saved to S3 with AWS SSO support +- **4-Class Severity** - Minor, Moderate, Major, Contraindicated + +## 📊 Training Results + +| Metric | Value | +|--------|-------| +| Model | Bio_ClinicalBERT | +| Dataset | DrugBank 176K DDI pairs | +| Train Loss | 0.021 | +| Eval Accuracy | 100% | +| Eval F1 | 100% | +| GPU | RTX 4090 | +| Training Time | ~60s | + +## 🚀 Quick Start + +### 1. Run Training via RunPod API + +```bash +curl -X POST "https://api.runpod.ai/v2/YOUR_ENDPOINT/run" \ + -H "Authorization: Bearer $RUNPOD_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{ + "input": { + "model_name": "emilyalsentzer/Bio_ClinicalBERT", + "max_samples": 10000, + "epochs": 1, + "batch_size": 16, + "s3_bucket": "your-bucket", + "aws_access_key_id": "...", + "aws_secret_access_key": "...", + "aws_session_token": "..." + } + }' ``` -## Usage +### 2. Download Trained Model -1. **Add a pipeline**: Create a Python file in `pipelines/` -2. **Push to main**: ArgoCD auto-deploys -3. **Monitor**: Check Kubeflow UI at - -## Quick Start - -```python -from kfp import dsl - -@dsl.component -def hello_world() -> str: - return "Hello from Kubeflow!" - -@dsl.pipeline(name="hello-pipeline") -def hello_pipeline(): - hello_world() +```bash +aws s3 cp s3://your-bucket/bert-classifier/model_YYYYMMDD_HHMMSS.tar.gz . +tar -xzf model_*.tar.gz ``` -## Environment +## 📁 Structure -- **Kubeflow**: -- **MinIO**: -- **ArgoCD**: +``` +├── components/ +│ └── runpod_trainer/ +│ ├── Dockerfile # RunPod serverless container +│ ├── handler.py # Training logic (BERT + LoRA LLM) +│ ├── requirements.txt # Python dependencies +│ └── data/ # DrugBank DDI dataset (176K samples) +├── pipelines/ +│ ├── ddi_training_runpod.py # Kubeflow pipeline definition +│ └── ddi_data_prep.py # Data preprocessing pipeline +├── .github/ +│ └── workflows/ +│ └── build-trainer.yaml # Auto-build on push +└── manifests/ + └── argocd-app.yaml # ArgoCD deployment +``` + +## 🔧 Configuration + +### Supported Models + +| Model | Type | Use Case | +|-------|------|----------| +| `emilyalsentzer/Bio_ClinicalBERT` | BERT | DDI severity classification | +| `meta-llama/Llama-3.1-8B-Instruct` | LLM | DDI explanation generation | +| `google/gemma-3-4b-it` | LLM | Lightweight DDI analysis | + +### Input Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `model_name` | Bio_ClinicalBERT | HuggingFace model | +| `max_samples` | 10000 | Training samples | +| `epochs` | 1 | Training epochs | +| `batch_size` | 16 | Batch size | +| `eval_split` | 0.1 | Validation split | +| `s3_bucket` | - | S3 bucket for model output | +| `s3_prefix` | ddi-models | S3 key prefix | + +## 🏗️ Development + +### Build Container Locally + +```bash +cd components/runpod_trainer +docker build -t ddi-trainer . +``` + +### Trigger GitHub Actions Build + +```bash +gh workflow run build-trainer.yaml +``` + +## 📜 License + +MIT diff --git a/components/runpod_trainer/runpod.toml b/components/runpod_trainer/runpod.toml deleted file mode 100644 index e9a48ef..0000000 --- a/components/runpod_trainer/runpod.toml +++ /dev/null @@ -1,12 +0,0 @@ -[project] -name = "ddi-trainer" -base_image = "runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04" -gpu_types = ["NVIDIA RTX A4000", "NVIDIA RTX A5000", "NVIDIA RTX A6000", "NVIDIA GeForce RTX 4090"] -gpu_count = 1 -volume_mount_path = "/runpod-volume" - -[project.env_vars] - -[runtime] -handler_path = "handler.py" -requirements_path = "requirements.txt" diff --git a/ddi_data_prep.yaml b/ddi_data_prep.yaml deleted file mode 100644 index ff42cb5..0000000 --- a/ddi_data_prep.yaml +++ /dev/null @@ -1,269 +0,0 @@ -# PIPELINE DEFINITION -# Name: ddi-data-preparation -# Description: Prepare DDI training data and configuration -# Inputs: -# epochs: int [Default: 3.0] -# learning_rate: float [Default: 2e-05] -# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] -# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] -components: - comp-create-ddi-dataset: - executorLabel: exec-create-ddi-dataset - inputDefinitions: - parameters: - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - output_path: - defaultValue: ddi_train.json - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING - comp-create-training-config: - executorLabel: exec-create-training-config - inputDefinitions: - parameters: - batch_size: - defaultValue: 16.0 - isOptional: true - parameterType: NUMBER_INTEGER - dataset_path: - parameterType: STRING - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING -deploymentSpec: - executors: - exec-create-ddi-dataset: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_ddi_dataset - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'botocore'\ - \ 'requests' && python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.15.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_ddi_dataset(\n minio_endpoint: str,\n minio_access_key:\ - \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ - \n) -> str:\n \"\"\"Create DDI training dataset and upload to MinIO.\"\ - \"\"\n import json\n import boto3\n\n # DDI training data (drug\ - \ pairs with interaction severity)\n # Labels: 0=none, 1=minor, 2=moderate,\ - \ 3=major, 4=contraindicated\n training_data = [\n # Major interactions\n\ - \ {\"text\": \"Patient taking warfarin and aspirin together\", \"\ - label\": 3},\n {\"text\": \"Concurrent use of simvastatin and amiodarone\"\ - , \"label\": 3},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ - , \"label\": 3},\n {\"text\": \"Ciprofloxacin and theophylline interaction\"\ - , \"label\": 3},\n {\"text\": \"Digoxin and amiodarone combination\ - \ therapy\", \"label\": 3},\n {\"text\": \"Lithium and ACE inhibitors\ - \ together\", \"label\": 3},\n\n # Contraindicated\n {\"text\"\ - : \"Fluoxetine and tramadol co-administration\", \"label\": 4},\n \ - \ {\"text\": \"SSRIs with MAO inhibitors\", \"label\": 4},\n {\"\ - text\": \"Benzodiazepines with opioids\", \"label\": 4},\n {\"text\"\ - : \"Metronidazole and alcohol consumption\", \"label\": 4},\n {\"\ - text\": \"Linezolid with serotonergic drugs\", \"label\": 4},\n\n \ - \ # Moderate\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ - , \"label\": 2},\n {\"text\": \"Atorvastatin given with diltiazem\"\ - , \"label\": 2},\n {\"text\": \"ACE inhibitor with potassium supplement\"\ - , \"label\": 2},\n {\"text\": \"Metformin with contrast dye procedures\"\ - , \"label\": 2},\n\n # Minor\n {\"text\": \"Levothyroxine\ - \ taken with calcium supplements\", \"label\": 1},\n {\"text\": \"\ - Antacids with oral antibiotics timing\", \"label\": 1},\n {\"text\"\ - : \"Iron supplements with dairy products\", \"label\": 1},\n\n #\ - \ No interaction\n {\"text\": \"Metformin administered with lisinopril\"\ - , \"label\": 0},\n {\"text\": \"Amlodipine with metoprolol combination\"\ - , \"label\": 0},\n {\"text\": \"Omeprazole and acetaminophen together\"\ - , \"label\": 0},\n {\"text\": \"Vitamin D with calcium supplements\"\ - , \"label\": 0},\n ]\n\n # Upload to MinIO with proper config for\ - \ Tailscale endpoints\n from botocore.config import Config\n\n s3_config\ - \ = Config(\n connect_timeout=30,\n read_timeout=60,\n \ - \ retries={'max_attempts': 3},\n s3={'addressing_style': 'path'}\n\ - \ )\n\n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\ - \ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\ - \ region_name='us-east-1',\n config=s3_config,\n verify=True\n\ - \ )\n\n data_json = json.dumps(training_data, indent=2)\n s3.put_object(\n\ - \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ - \ ContentType='application/json'\n )\n\n print(f\"\u2705 Uploaded\ - \ {len(training_data)} samples to datasets/{output_path}\")\n print(f\"\ - \ - Contraindicated: {sum(1 for d in training_data if d['label'] == 4)}\"\ - )\n print(f\" - Major: {sum(1 for d in training_data if d['label']\ - \ == 3)}\")\n print(f\" - Moderate: {sum(1 for d in training_data if\ - \ d['label'] == 2)}\")\n print(f\" - Minor: {sum(1 for d in training_data\ - \ if d['label'] == 1)}\")\n print(f\" - None: {sum(1 for d in training_data\ - \ if d['label'] == 0)}\")\n\n return f\"s3://datasets/{output_path}\"\ - \n\n" - image: python:3.11-slim - exec-create-training-config: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_training_config - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_training_config(\n minio_endpoint: str,\n minio_access_key:\ - \ str,\n minio_secret_key: str,\n dataset_path: str,\n model_name:\ - \ str = \"emilyalsentzer/Bio_ClinicalBERT\",\n epochs: int = 3,\n \ - \ learning_rate: float = 2e-5,\n batch_size: int = 16\n) -> str:\n \ - \ \"\"\"Create training configuration file.\"\"\"\n import json\n \ - \ import boto3\n from datetime import datetime\n\n config = {\n \ - \ \"created_at\": datetime.utcnow().isoformat(),\n \"dataset\"\ - : {\n \"path\": dataset_path,\n \"format\": \"json\"\ - ,\n \"text_field\": \"text\",\n \"label_field\": \"\ - label\"\n },\n \"model\": {\n \"base_model\": model_name,\n\ - \ \"num_labels\": 5,\n \"label_names\": [\"none\"\ - , \"minor\", \"moderate\", \"major\", \"contraindicated\"]\n },\n\ - \ \"training\": {\n \"epochs\": epochs,\n \"\ - learning_rate\": learning_rate,\n \"batch_size\": batch_size,\n\ - \ \"warmup_steps\": 100,\n \"weight_decay\": 0.01,\n\ - \ \"fp16\": True,\n \"evaluation_strategy\": \"epoch\"\ - ,\n \"save_strategy\": \"epoch\"\n },\n \"output\"\ - : {\n \"model_path\": \"models/ddi-detector\",\n \"\ - metrics_path\": \"models/ddi-detector/metrics.json\"\n }\n }\n\ - \n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\ - \ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\ - \ region_name='us-east-1'\n )\n\n config_json = json.dumps(config,\ - \ indent=2)\n config_path = \"configs/ddi_training_config.json\"\n\n\ - \ s3.put_object(\n Bucket='training-data',\n Key=config_path,\n\ - \ Body=config_json.encode('utf-8'),\n ContentType='application/json'\n\ - \ )\n\n print(f\"\u2705 Training config saved to training-data/{config_path}\"\ - )\n print(f\" Model: {model_name}\")\n print(f\" Epochs: {epochs}\"\ - )\n print(f\" Learning rate: {learning_rate}\")\n\n return f\"s3://training-data/{config_path}\"\ - \n\n" - image: python:3.11-slim -pipelineInfo: - description: Prepare DDI training data and configuration - name: ddi-data-preparation -root: - dag: - tasks: - create-ddi-dataset: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-ddi-dataset - inputs: - parameters: - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - output_path: - runtimeValue: - constant: ddi_train.json - taskInfo: - name: create-ddi-dataset - create-training-config: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-training-config - dependentTasks: - - create-ddi-dataset - inputs: - parameters: - dataset_path: - taskOutputParameter: - outputParameterKey: Output - producerTask: create-ddi-dataset - epochs: - componentInputParameter: epochs - learning_rate: - componentInputParameter: learning_rate - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - model_name: - componentInputParameter: model_name - taskInfo: - name: create-training-config - inputDefinitions: - parameters: - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_endpoint: - defaultValue: http://minio.minio.svc.cluster.local:9000 - isOptional: true - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING -schemaVersion: 2.1.0 -sdkVersion: kfp-2.15.2 diff --git a/ddi_data_prep_ts.yaml b/ddi_data_prep_ts.yaml deleted file mode 100644 index 5a0bd5f..0000000 --- a/ddi_data_prep_ts.yaml +++ /dev/null @@ -1,265 +0,0 @@ -# PIPELINE DEFINITION -# Name: ddi-data-preparation -# Description: Prepare DDI training data and configuration -# Inputs: -# epochs: int [Default: 3.0] -# learning_rate: float [Default: 2e-05] -# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] -# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] -components: - comp-create-ddi-dataset: - executorLabel: exec-create-ddi-dataset - inputDefinitions: - parameters: - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - output_path: - defaultValue: ddi_train.json - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING - comp-create-training-config: - executorLabel: exec-create-training-config - inputDefinitions: - parameters: - batch_size: - defaultValue: 16.0 - isOptional: true - parameterType: NUMBER_INTEGER - dataset_path: - parameterType: STRING - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING -deploymentSpec: - executors: - exec-create-ddi-dataset: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_ddi_dataset - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ - \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_ddi_dataset(\n minio_endpoint: str,\n minio_access_key:\ - \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ - \n) -> str:\n \"\"\"Create DDI training dataset and upload to MinIO.\"\ - \"\"\n import json\n import boto3\n\n # DDI training data (drug\ - \ pairs with interaction severity)\n # Labels: 0=none, 1=minor, 2=moderate,\ - \ 3=major, 4=contraindicated\n training_data = [\n # Major interactions\n\ - \ {\"text\": \"Patient taking warfarin and aspirin together\", \"\ - label\": 3},\n {\"text\": \"Concurrent use of simvastatin and amiodarone\"\ - , \"label\": 3},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ - , \"label\": 3},\n {\"text\": \"Ciprofloxacin and theophylline interaction\"\ - , \"label\": 3},\n {\"text\": \"Digoxin and amiodarone combination\ - \ therapy\", \"label\": 3},\n {\"text\": \"Lithium and ACE inhibitors\ - \ together\", \"label\": 3},\n\n # Contraindicated\n {\"text\"\ - : \"Fluoxetine and tramadol co-administration\", \"label\": 4},\n \ - \ {\"text\": \"SSRIs with MAO inhibitors\", \"label\": 4},\n {\"\ - text\": \"Benzodiazepines with opioids\", \"label\": 4},\n {\"text\"\ - : \"Metronidazole and alcohol consumption\", \"label\": 4},\n {\"\ - text\": \"Linezolid with serotonergic drugs\", \"label\": 4},\n\n \ - \ # Moderate\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ - , \"label\": 2},\n {\"text\": \"Atorvastatin given with diltiazem\"\ - , \"label\": 2},\n {\"text\": \"ACE inhibitor with potassium supplement\"\ - , \"label\": 2},\n {\"text\": \"Metformin with contrast dye procedures\"\ - , \"label\": 2},\n\n # Minor\n {\"text\": \"Levothyroxine\ - \ taken with calcium supplements\", \"label\": 1},\n {\"text\": \"\ - Antacids with oral antibiotics timing\", \"label\": 1},\n {\"text\"\ - : \"Iron supplements with dairy products\", \"label\": 1},\n\n #\ - \ No interaction\n {\"text\": \"Metformin administered with lisinopril\"\ - , \"label\": 0},\n {\"text\": \"Amlodipine with metoprolol combination\"\ - , \"label\": 0},\n {\"text\": \"Omeprazole and acetaminophen together\"\ - , \"label\": 0},\n {\"text\": \"Vitamin D with calcium supplements\"\ - , \"label\": 0},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ - \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ - \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ - \ )\n\n data_json = json.dumps(training_data, indent=2)\n s3.put_object(\n\ - \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ - \ ContentType='application/json'\n )\n\n print(f\"\u2705 Uploaded\ - \ {len(training_data)} samples to datasets/{output_path}\")\n print(f\"\ - \ - Contraindicated: {sum(1 for d in training_data if d['label'] == 4)}\"\ - )\n print(f\" - Major: {sum(1 for d in training_data if d['label']\ - \ == 3)}\")\n print(f\" - Moderate: {sum(1 for d in training_data if\ - \ d['label'] == 2)}\")\n print(f\" - Minor: {sum(1 for d in training_data\ - \ if d['label'] == 1)}\")\n print(f\" - None: {sum(1 for d in training_data\ - \ if d['label'] == 0)}\")\n\n return f\"s3://datasets/{output_path}\"\ - \n\n" - image: python:3.11-slim - exec-create-training-config: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_training_config - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_training_config(\n minio_endpoint: str,\n minio_access_key:\ - \ str,\n minio_secret_key: str,\n dataset_path: str,\n model_name:\ - \ str = \"emilyalsentzer/Bio_ClinicalBERT\",\n epochs: int = 3,\n \ - \ learning_rate: float = 2e-5,\n batch_size: int = 16\n) -> str:\n \ - \ \"\"\"Create training configuration file.\"\"\"\n import json\n \ - \ import boto3\n from datetime import datetime\n\n config = {\n \ - \ \"created_at\": datetime.utcnow().isoformat(),\n \"dataset\"\ - : {\n \"path\": dataset_path,\n \"format\": \"json\"\ - ,\n \"text_field\": \"text\",\n \"label_field\": \"\ - label\"\n },\n \"model\": {\n \"base_model\": model_name,\n\ - \ \"num_labels\": 5,\n \"label_names\": [\"none\"\ - , \"minor\", \"moderate\", \"major\", \"contraindicated\"]\n },\n\ - \ \"training\": {\n \"epochs\": epochs,\n \"\ - learning_rate\": learning_rate,\n \"batch_size\": batch_size,\n\ - \ \"warmup_steps\": 100,\n \"weight_decay\": 0.01,\n\ - \ \"fp16\": True,\n \"evaluation_strategy\": \"epoch\"\ - ,\n \"save_strategy\": \"epoch\"\n },\n \"output\"\ - : {\n \"model_path\": \"models/ddi-detector\",\n \"\ - metrics_path\": \"models/ddi-detector/metrics.json\"\n }\n }\n\ - \n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\ - \ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\ - \ region_name='us-east-1'\n )\n\n config_json = json.dumps(config,\ - \ indent=2)\n config_path = \"configs/ddi_training_config.json\"\n\n\ - \ s3.put_object(\n Bucket='training-data',\n Key=config_path,\n\ - \ Body=config_json.encode('utf-8'),\n ContentType='application/json'\n\ - \ )\n\n print(f\"\u2705 Training config saved to training-data/{config_path}\"\ - )\n print(f\" Model: {model_name}\")\n print(f\" Epochs: {epochs}\"\ - )\n print(f\" Learning rate: {learning_rate}\")\n\n return f\"s3://training-data/{config_path}\"\ - \n\n" - image: python:3.11-slim -pipelineInfo: - description: Prepare DDI training data and configuration - name: ddi-data-preparation -root: - dag: - tasks: - create-ddi-dataset: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-ddi-dataset - inputs: - parameters: - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - output_path: - runtimeValue: - constant: ddi_train.json - taskInfo: - name: create-ddi-dataset - create-training-config: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-training-config - dependentTasks: - - create-ddi-dataset - inputs: - parameters: - dataset_path: - taskOutputParameter: - outputParameterKey: Output - producerTask: create-ddi-dataset - epochs: - componentInputParameter: epochs - learning_rate: - componentInputParameter: learning_rate - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - model_name: - componentInputParameter: model_name - taskInfo: - name: create-training-config - inputDefinitions: - parameters: - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_endpoint: - defaultValue: http://minio.minio.svc.cluster.local:9000 - isOptional: true - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING -schemaVersion: 2.1.0 -sdkVersion: kfp-2.15.2 diff --git a/ddi_training_runpod.yaml b/ddi_training_runpod.yaml deleted file mode 100644 index 816e228..0000000 --- a/ddi_training_runpod.yaml +++ /dev/null @@ -1,371 +0,0 @@ -# PIPELINE DEFINITION -# Name: ddi-training-runpod -# Description: Train DDI detection model using RunPod serverless GPU -# Inputs: -# epochs: int [Default: 3.0] -# learning_rate: float [Default: 2e-05] -# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] -# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] -# model_version: str [Default: 'v1'] -# runpod_endpoint_id: str [Default: 'YOUR_ENDPOINT_ID'] -components: - comp-create-sample-dataset: - executorLabel: exec-create-sample-dataset - inputDefinitions: - parameters: - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - output_path: - defaultValue: ddi_train.json - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING - comp-register-model: - executorLabel: exec-register-model - inputDefinitions: - parameters: - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - model_name: - defaultValue: ddi-detector - isOptional: true - parameterType: STRING - model_path: - parameterType: STRING - version: - defaultValue: v1 - isOptional: true - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING - comp-trigger-runpod-training: - executorLabel: exec-trigger-runpod-training - inputDefinitions: - parameters: - dataset_path: - parameterType: STRING - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_access_key: - parameterType: STRING - minio_endpoint: - parameterType: STRING - minio_secret_key: - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING - output_model_path: - defaultValue: ddi_model_v1 - isOptional: true - parameterType: STRING - runpod_api_key: - parameterType: STRING - runpod_endpoint_id: - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING -deploymentSpec: - executors: - exec-create-sample-dataset: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - create_sample_dataset - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ - \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef create_sample_dataset(\n minio_endpoint: str,\n minio_access_key:\ - \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ - \n) -> str:\n \"\"\"Create a sample DDI training dataset for testing.\"\ - \"\"\n import json\n import boto3\n\n # Sample DDI training data\ - \ (drug pairs with interaction labels)\n # Labels: 0=none, 1=minor, 2=moderate,\ - \ 3=major, 4=contraindicated\n sample_data = [\n {\"text\": \"\ - Patient taking warfarin and aspirin together\", \"label\": 3},\n \ - \ {\"text\": \"Metformin administered with lisinopril\", \"label\": 0},\n\ - \ {\"text\": \"Concurrent use of simvastatin and amiodarone\", \"\ - label\": 3},\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ - , \"label\": 2},\n {\"text\": \"Fluoxetine and tramadol co-administration\"\ - , \"label\": 4},\n {\"text\": \"Atorvastatin given with diltiazem\"\ - , \"label\": 2},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ - , \"label\": 3},\n {\"text\": \"Levothyroxine taken with calcium\ - \ supplements\", \"label\": 1},\n {\"text\": \"Ciprofloxacin and\ - \ theophylline interaction\", \"label\": 3},\n {\"text\": \"ACE inhibitor\ - \ with potassium supplement\", \"label\": 2},\n # Add more samples\ - \ for better training\n {\"text\": \"Digoxin and amiodarone combination\ - \ therapy\", \"label\": 3},\n {\"text\": \"SSRIs with MAO inhibitors\"\ - , \"label\": 4},\n {\"text\": \"Lithium and ACE inhibitors together\"\ - , \"label\": 3},\n {\"text\": \"Benzodiazepines with opioids\", \"\ - label\": 4},\n {\"text\": \"Metronidazole and alcohol consumption\"\ - , \"label\": 4},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ - \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ - \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ - \ )\n\n data_json = json.dumps(sample_data)\n s3.put_object(\n\ - \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ - \ ContentType='application/json'\n )\n\n print(f\"Uploaded\ - \ sample dataset to datasets/{output_path}\")\n return output_path\n\n" - image: python:3.11-slim - exec-register-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - register_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef register_model(\n model_path: str,\n minio_endpoint: str,\n\ - \ minio_access_key: str,\n minio_secret_key: str,\n model_name:\ - \ str = \"ddi-detector\",\n version: str = \"v1\"\n) -> str:\n \"\"\ - \"Register the trained model in the model registry.\"\"\"\n import boto3\n\ - \ import json\n from datetime import datetime\n\n s3 = boto3.client(\n\ - \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ - \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ - \ )\n\n # Create model registry entry\n registry_entry = {\n \ - \ \"name\": model_name,\n \"version\": version,\n \"\ - path\": model_path,\n \"created_at\": datetime.utcnow().isoformat(),\n\ - \ \"framework\": \"transformers\",\n \"task\": \"sequence-classification\"\ - ,\n \"labels\": [\"none\", \"minor\", \"moderate\", \"major\", \"\ - contraindicated\"]\n }\n\n registry_key = f\"registry/{model_name}/{version}/metadata.json\"\ - \n s3.put_object(\n Bucket='models',\n Key=registry_key,\n\ - \ Body=json.dumps(registry_entry).encode('utf-8'),\n ContentType='application/json'\n\ - \ )\n\n print(f\"Model registered: {model_name} v{version}\")\n \ - \ print(f\"Registry path: models/{registry_key}\")\n\n return f\"models/{registry_key}\"\ - \n\n" - image: python:3.11-slim - exec-trigger-runpod-training: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - trigger_runpod_training - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'requests' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef trigger_runpod_training(\n runpod_api_key: str,\n runpod_endpoint_id:\ - \ str,\n minio_endpoint: str,\n minio_access_key: str,\n minio_secret_key:\ - \ str,\n dataset_path: str,\n model_name: str = \"emilyalsentzer/Bio_ClinicalBERT\"\ - ,\n epochs: int = 3,\n learning_rate: float = 2e-5,\n output_model_path:\ - \ str = \"ddi_model_v1\"\n) -> str:\n \"\"\"Trigger RunPod serverless\ - \ training job.\"\"\"\n import requests\n import json\n import\ - \ time\n\n # RunPod API endpoint\n url = f\"https://api.runpod.ai/v2/{runpod_endpoint_id}/runsync\"\ - \n\n headers = {\n \"Authorization\": f\"Bearer {runpod_api_key}\"\ - ,\n \"Content-Type\": \"application/json\"\n }\n\n payload\ - \ = {\n \"input\": {\n \"model_name\": model_name,\n \ - \ \"dataset_path\": dataset_path,\n \"epochs\": epochs,\n\ - \ \"learning_rate\": learning_rate,\n \"batch_size\"\ - : 16,\n \"output_path\": output_model_path,\n # MinIO\ - \ credentials for the worker\n \"minio_endpoint\": minio_endpoint,\n\ - \ \"minio_access_key\": minio_access_key,\n \"minio_secret_key\"\ - : minio_secret_key\n }\n }\n\n print(f\"Triggering RunPod training\ - \ job...\")\n print(f\"Model: {model_name}\")\n print(f\"Dataset:\ - \ {dataset_path}\")\n print(f\"Epochs: {epochs}\")\n\n response =\ - \ requests.post(url, headers=headers, json=payload, timeout=3600)\n result\ - \ = response.json()\n\n if response.status_code != 200:\n raise\ - \ Exception(f\"RunPod API error: {result}\")\n\n if result.get('status')\ - \ == 'FAILED':\n raise Exception(f\"Training failed: {result.get('error')}\"\ - )\n\n output = result.get('output', {})\n print(f\"Training complete!\"\ - )\n print(f\"Model path: {output.get('model_path')}\")\n print(f\"\ - Metrics: {output.get('metrics')}\")\n\n return output.get('model_path',\ - \ f\"s3://models/{output_model_path}\")\n\n" - image: python:3.11-slim -pipelineInfo: - description: Train DDI detection model using RunPod serverless GPU - name: ddi-training-runpod -root: - dag: - tasks: - create-sample-dataset: - cachingOptions: - enableCache: true - componentRef: - name: comp-create-sample-dataset - inputs: - parameters: - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - output_path: - runtimeValue: - constant: ddi_train_{{$.inputs.parameters['pipelinechannel--model_version']}}.json - pipelinechannel--model_version: - componentInputParameter: model_version - taskInfo: - name: create-sample-dataset - register-model: - cachingOptions: - enableCache: true - componentRef: - name: comp-register-model - dependentTasks: - - trigger-runpod-training - inputs: - parameters: - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - model_name: - runtimeValue: - constant: ddi-detector - model_path: - taskOutputParameter: - outputParameterKey: Output - producerTask: trigger-runpod-training - version: - componentInputParameter: model_version - taskInfo: - name: register-model - trigger-runpod-training: - cachingOptions: - enableCache: true - componentRef: - name: comp-trigger-runpod-training - dependentTasks: - - create-sample-dataset - inputs: - parameters: - dataset_path: - taskOutputParameter: - outputParameterKey: Output - producerTask: create-sample-dataset - epochs: - componentInputParameter: epochs - learning_rate: - componentInputParameter: learning_rate - minio_access_key: - runtimeValue: - constant: minioadmin - minio_endpoint: - componentInputParameter: minio_endpoint - minio_secret_key: - runtimeValue: - constant: minioadmin123! - model_name: - componentInputParameter: model_name - output_model_path: - runtimeValue: - constant: ddi_model_{{$.inputs.parameters['pipelinechannel--model_version']}} - pipelinechannel--model_version: - componentInputParameter: model_version - runpod_api_key: - runtimeValue: - constant: '' - runpod_endpoint_id: - componentInputParameter: runpod_endpoint_id - taskInfo: - name: trigger-runpod-training - inputDefinitions: - parameters: - epochs: - defaultValue: 3.0 - isOptional: true - parameterType: NUMBER_INTEGER - learning_rate: - defaultValue: 2.0e-05 - isOptional: true - parameterType: NUMBER_DOUBLE - minio_endpoint: - defaultValue: http://minio.minio.svc.cluster.local:9000 - isOptional: true - parameterType: STRING - model_name: - defaultValue: emilyalsentzer/Bio_ClinicalBERT - isOptional: true - parameterType: STRING - model_version: - defaultValue: v1 - isOptional: true - parameterType: STRING - runpod_endpoint_id: - defaultValue: YOUR_ENDPOINT_ID - isOptional: true - parameterType: STRING -schemaVersion: 2.1.0 -sdkVersion: kfp-2.15.2 diff --git a/hello_world.yaml b/hello_world.yaml deleted file mode 100644 index 094df61..0000000 --- a/hello_world.yaml +++ /dev/null @@ -1,128 +0,0 @@ -# PIPELINE DEFINITION -# Name: hello-world-pipeline -# Description: A simple hello world pipeline to test Kubeflow setup -# Inputs: -# name: str [Default: 'Kubeflow User'] -components: - comp-process-greeting: - executorLabel: exec-process-greeting - inputDefinitions: - parameters: - greeting: - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING - comp-say-hello: - executorLabel: exec-say-hello - inputDefinitions: - parameters: - name: - parameterType: STRING - outputDefinitions: - parameters: - Output: - parameterType: STRING -deploymentSpec: - executors: - exec-process-greeting: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - process_greeting - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef process_greeting(greeting: str) -> str:\n \"\"\"Process the\ - \ greeting message.\"\"\"\n processed = greeting.upper()\n print(f\"\ - Processed: {processed}\")\n return processed\n\n" - image: python:3.11-slim - exec-say-hello: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - say_hello - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef say_hello(name: str) -> str:\n \"\"\"Simple component that\ - \ returns a greeting.\"\"\"\n message = f\"Hello, {name}! Welcome to\ - \ Kubeflow Pipelines.\"\n print(message)\n return message\n\n" - image: python:3.11-slim -pipelineInfo: - description: A simple hello world pipeline to test Kubeflow setup - name: hello-world-pipeline -root: - dag: - tasks: - process-greeting: - cachingOptions: - enableCache: true - componentRef: - name: comp-process-greeting - dependentTasks: - - say-hello - inputs: - parameters: - greeting: - taskOutputParameter: - outputParameterKey: Output - producerTask: say-hello - taskInfo: - name: process-greeting - say-hello: - cachingOptions: - enableCache: true - componentRef: - name: comp-say-hello - inputs: - parameters: - name: - componentInputParameter: name - taskInfo: - name: say-hello - inputDefinitions: - parameters: - name: - defaultValue: Kubeflow User - isOptional: true - parameterType: STRING -schemaVersion: 2.1.0 -sdkVersion: kfp-2.15.2 diff --git a/pipelines/examples/hello_world.py b/pipelines/examples/hello_world.py deleted file mode 100644 index caacc58..0000000 --- a/pipelines/examples/hello_world.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -Hello World Pipeline - Basic Kubeflow Pipeline Example -""" -from kfp import dsl -from kfp import compiler - - -@dsl.component(base_image="python:3.11-slim") -def say_hello(name: str) -> str: - """Simple component that returns a greeting.""" - message = f"Hello, {name}! Welcome to Kubeflow Pipelines." - print(message) - return message - - -@dsl.component(base_image="python:3.11-slim") -def process_greeting(greeting: str) -> str: - """Process the greeting message.""" - processed = greeting.upper() - print(f"Processed: {processed}") - return processed - - -@dsl.pipeline( - name="hello-world-pipeline", - description="A simple hello world pipeline to test Kubeflow setup" -) -def hello_world_pipeline(name: str = "Kubeflow User"): - """ - Simple pipeline that: - 1. Generates a greeting - 2. Processes it - """ - hello_task = say_hello(name=name) - process_task = process_greeting(greeting=hello_task.output) - - -if __name__ == "__main__": - # Compile the pipeline - compiler.Compiler().compile( - pipeline_func=hello_world_pipeline, - package_path="hello_world_pipeline.yaml" - ) - print("Pipeline compiled to hello_world_pipeline.yaml") diff --git a/pipelines/med_rx_training.py b/pipelines/med_rx_training.py deleted file mode 100644 index 18eb9cb..0000000 --- a/pipelines/med_rx_training.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -Medical Drug Interaction Training Pipeline - -This pipeline trains a model to detect drug-drug interactions (DDI) -from clinical documents in CCDA/FHIR formats. -""" -from kfp import dsl -from kfp import compiler - - -@dsl.component( - base_image="python:3.11-slim", - packages_to_install=["pandas", "lxml", "fhir.resources"] -) -def preprocess_ccda( - input_path: str, - output_path: dsl.OutputPath("Dataset") -): - """Parse CCDA XML files and extract medication data.""" - import json - from lxml import etree - - # CCDA namespace - NS = {"hl7": "urn:hl7-org:v3"} - - medications = [] - - # Parse CCDA and extract medications - # (simplified example - full implementation in production) - result = { - "source": "ccda", - "medications": medications, - "processed": True - } - - with open(output_path, 'w') as f: - json.dump(result, f) - - -@dsl.component( - base_image="python:3.11-slim", - packages_to_install=["pandas", "fhir.resources"] -) -def preprocess_fhir( - input_path: str, - output_path: dsl.OutputPath("Dataset") -): - """Parse FHIR R4 resources and extract medication data.""" - import json - - medications = [] - - result = { - "source": "fhir", - "medications": medications, - "processed": True - } - - with open(output_path, 'w') as f: - json.dump(result, f) - - -@dsl.component( - base_image="python:3.11-slim", - packages_to_install=["requests"] -) -def normalize_rxnorm( - input_dataset: dsl.Input["Dataset"], - output_path: dsl.OutputPath("Dataset") -): - """Normalize medication names using RxNorm API.""" - import json - - with open(input_dataset.path, 'r') as f: - data = json.load(f) - - # Normalize medications via RxNorm - # (API call implementation) - - data["normalized"] = True - - with open(output_path, 'w') as f: - json.dump(data, f) - - -@dsl.component( - base_image="huggingface/transformers-pytorch-gpu:latest", - packages_to_install=["datasets", "accelerate", "scikit-learn"] -) -def train_ddi_model( - train_dataset: dsl.Input["Dataset"], - model_name: str, - epochs: int, - learning_rate: float, - output_model: dsl.OutputPath("Model") -): - """Fine-tune a transformer model for DDI detection.""" - import json - import os - from transformers import ( - AutoTokenizer, - AutoModelForSequenceClassification, - TrainingArguments, - Trainer - ) - - # Load base model - tokenizer = AutoTokenizer.from_pretrained(model_name) - model = AutoModelForSequenceClassification.from_pretrained( - model_name, - num_labels=5 # DDI severity levels - ) - - # Training configuration - training_args = TrainingArguments( - output_dir=output_model, - num_train_epochs=epochs, - learning_rate=learning_rate, - per_device_train_batch_size=16, - evaluation_strategy="epoch", - save_strategy="epoch", - load_best_model_at_end=True, - ) - - # Train (placeholder - needs actual dataset loading) - print(f"Training {model_name} for {epochs} epochs") - - # Save model - model.save_pretrained(output_model) - tokenizer.save_pretrained(output_model) - - -@dsl.component( - base_image="python:3.11-slim", - packages_to_install=["scikit-learn", "pandas"] -) -def evaluate_model( - model_path: dsl.Input["Model"], - test_dataset: dsl.Input["Dataset"], - metrics_output: dsl.OutputPath("Metrics") -): - """Evaluate the trained model and output metrics.""" - import json - - metrics = { - "f1_micro": 0.0, - "f1_macro": 0.0, - "precision": 0.0, - "recall": 0.0, - "auprc": 0.0 - } - - with open(metrics_output, 'w') as f: - json.dump(metrics, f) - - -@dsl.pipeline( - name="med-rx-ddi-training", - description="Train DDI detection model on CCDA/FHIR clinical data" -) -def med_rx_training_pipeline( - ccda_input_path: str = "s3://minio/data/ccda/", - fhir_input_path: str = "s3://minio/data/fhir/", - base_model: str = "emilyalsentzer/Bio_ClinicalBERT", - epochs: int = 3, - learning_rate: float = 2e-5 -): - """ - Full DDI training pipeline: - 1. Preprocess CCDA and FHIR data - 2. Normalize medications via RxNorm - 3. Train transformer model - 4. Evaluate and output metrics - """ - # Preprocess data sources - ccda_task = preprocess_ccda(input_path=ccda_input_path) - fhir_task = preprocess_fhir(input_path=fhir_input_path) - - # Normalize CCDA data - normalize_ccda = normalize_rxnorm(input_dataset=ccda_task.outputs["output_path"]) - - # Train model (using CCDA for now) - train_task = train_ddi_model( - train_dataset=normalize_ccda.outputs["output_path"], - model_name=base_model, - epochs=epochs, - learning_rate=learning_rate - ) - - # Evaluate - eval_task = evaluate_model( - model_path=train_task.outputs["output_model"], - test_dataset=normalize_ccda.outputs["output_path"] - ) - - -if __name__ == "__main__": - compiler.Compiler().compile( - pipeline_func=med_rx_training_pipeline, - package_path="med_rx_training_pipeline.yaml" - ) - print("Pipeline compiled to med_rx_training_pipeline.yaml")