diff --git a/README.md b/README.md index f30d683..8ce26e9 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ This repository contains ML pipeline definitions managed via ArgoCD. 1. **Add a pipeline**: Create a Python file in `pipelines/` 2. **Push to main**: ArgoCD auto-deploys -3. **Monitor**: Check Kubeflow UI at https://kubeflow.walleye-frog.ts.net +3. **Monitor**: Check Kubeflow UI at ## Quick Start @@ -36,6 +36,6 @@ def hello_pipeline(): ## Environment -- **Kubeflow**: https://kubeflow.walleye-frog.ts.net -- **MinIO**: https://minio.walleye-frog.ts.net -- **ArgoCD**: https://argocd.walleye-frog.ts.net +- **Kubeflow**: +- **MinIO**: +- **ArgoCD**: diff --git a/ddi_data_prep.yaml b/ddi_data_prep.yaml index 5a0bd5f..ff42cb5 100644 --- a/ddi_data_prep.yaml +++ b/ddi_data_prep.yaml @@ -71,10 +71,10 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ - \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ - $0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'botocore'\ + \ 'requests' && python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.15.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) @@ -116,9 +116,13 @@ deploymentSpec: , \"label\": 0},\n {\"text\": \"Amlodipine with metoprolol combination\"\ , \"label\": 0},\n {\"text\": \"Omeprazole and acetaminophen together\"\ , \"label\": 0},\n {\"text\": \"Vitamin D with calcium supplements\"\ - , \"label\": 0},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ - \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ - \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ + , \"label\": 0},\n ]\n\n # Upload to MinIO with proper config for\ + \ Tailscale endpoints\n from botocore.config import Config\n\n s3_config\ + \ = Config(\n connect_timeout=30,\n read_timeout=60,\n \ + \ retries={'max_attempts': 3},\n s3={'addressing_style': 'path'}\n\ + \ )\n\n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\ + \ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\ + \ region_name='us-east-1',\n config=s3_config,\n verify=True\n\ \ )\n\n data_json = json.dumps(training_data, indent=2)\n s3.put_object(\n\ \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ \ ContentType='application/json'\n )\n\n print(f\"\u2705 Uploaded\ diff --git a/ddi_data_prep_ts.yaml b/ddi_data_prep_ts.yaml new file mode 100644 index 0000000..5a0bd5f --- /dev/null +++ b/ddi_data_prep_ts.yaml @@ -0,0 +1,265 @@ +# PIPELINE DEFINITION +# Name: ddi-data-preparation +# Description: Prepare DDI training data and configuration +# Inputs: +# epochs: int [Default: 3.0] +# learning_rate: float [Default: 2e-05] +# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000'] +# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT'] +components: + comp-create-ddi-dataset: + executorLabel: exec-create-ddi-dataset + inputDefinitions: + parameters: + minio_access_key: + parameterType: STRING + minio_endpoint: + parameterType: STRING + minio_secret_key: + parameterType: STRING + output_path: + defaultValue: ddi_train.json + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-create-training-config: + executorLabel: exec-create-training-config + inputDefinitions: + parameters: + batch_size: + defaultValue: 16.0 + isOptional: true + parameterType: NUMBER_INTEGER + dataset_path: + parameterType: STRING + epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 2.0e-05 + isOptional: true + parameterType: NUMBER_DOUBLE + minio_access_key: + parameterType: STRING + minio_endpoint: + parameterType: STRING + minio_secret_key: + parameterType: STRING + model_name: + defaultValue: emilyalsentzer/Bio_ClinicalBERT + isOptional: true + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-create-ddi-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_ddi_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\ + \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_ddi_dataset(\n minio_endpoint: str,\n minio_access_key:\ + \ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\ + \n) -> str:\n \"\"\"Create DDI training dataset and upload to MinIO.\"\ + \"\"\n import json\n import boto3\n\n # DDI training data (drug\ + \ pairs with interaction severity)\n # Labels: 0=none, 1=minor, 2=moderate,\ + \ 3=major, 4=contraindicated\n training_data = [\n # Major interactions\n\ + \ {\"text\": \"Patient taking warfarin and aspirin together\", \"\ + label\": 3},\n {\"text\": \"Concurrent use of simvastatin and amiodarone\"\ + , \"label\": 3},\n {\"text\": \"Methotrexate and NSAIDs used together\"\ + , \"label\": 3},\n {\"text\": \"Ciprofloxacin and theophylline interaction\"\ + , \"label\": 3},\n {\"text\": \"Digoxin and amiodarone combination\ + \ therapy\", \"label\": 3},\n {\"text\": \"Lithium and ACE inhibitors\ + \ together\", \"label\": 3},\n\n # Contraindicated\n {\"text\"\ + : \"Fluoxetine and tramadol co-administration\", \"label\": 4},\n \ + \ {\"text\": \"SSRIs with MAO inhibitors\", \"label\": 4},\n {\"\ + text\": \"Benzodiazepines with opioids\", \"label\": 4},\n {\"text\"\ + : \"Metronidazole and alcohol consumption\", \"label\": 4},\n {\"\ + text\": \"Linezolid with serotonergic drugs\", \"label\": 4},\n\n \ + \ # Moderate\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\ + , \"label\": 2},\n {\"text\": \"Atorvastatin given with diltiazem\"\ + , \"label\": 2},\n {\"text\": \"ACE inhibitor with potassium supplement\"\ + , \"label\": 2},\n {\"text\": \"Metformin with contrast dye procedures\"\ + , \"label\": 2},\n\n # Minor\n {\"text\": \"Levothyroxine\ + \ taken with calcium supplements\", \"label\": 1},\n {\"text\": \"\ + Antacids with oral antibiotics timing\", \"label\": 1},\n {\"text\"\ + : \"Iron supplements with dairy products\", \"label\": 1},\n\n #\ + \ No interaction\n {\"text\": \"Metformin administered with lisinopril\"\ + , \"label\": 0},\n {\"text\": \"Amlodipine with metoprolol combination\"\ + , \"label\": 0},\n {\"text\": \"Omeprazole and acetaminophen together\"\ + , \"label\": 0},\n {\"text\": \"Vitamin D with calcium supplements\"\ + , \"label\": 0},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\ + \ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\ + \ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\ + \ )\n\n data_json = json.dumps(training_data, indent=2)\n s3.put_object(\n\ + \ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\ + \ ContentType='application/json'\n )\n\n print(f\"\u2705 Uploaded\ + \ {len(training_data)} samples to datasets/{output_path}\")\n print(f\"\ + \ - Contraindicated: {sum(1 for d in training_data if d['label'] == 4)}\"\ + )\n print(f\" - Major: {sum(1 for d in training_data if d['label']\ + \ == 3)}\")\n print(f\" - Moderate: {sum(1 for d in training_data if\ + \ d['label'] == 2)}\")\n print(f\" - Minor: {sum(1 for d in training_data\ + \ if d['label'] == 1)}\")\n print(f\" - None: {sum(1 for d in training_data\ + \ if d['label'] == 0)}\")\n\n return f\"s3://datasets/{output_path}\"\ + \n\n" + image: python:3.11-slim + exec-create-training-config: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_training_config + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_training_config(\n minio_endpoint: str,\n minio_access_key:\ + \ str,\n minio_secret_key: str,\n dataset_path: str,\n model_name:\ + \ str = \"emilyalsentzer/Bio_ClinicalBERT\",\n epochs: int = 3,\n \ + \ learning_rate: float = 2e-5,\n batch_size: int = 16\n) -> str:\n \ + \ \"\"\"Create training configuration file.\"\"\"\n import json\n \ + \ import boto3\n from datetime import datetime\n\n config = {\n \ + \ \"created_at\": datetime.utcnow().isoformat(),\n \"dataset\"\ + : {\n \"path\": dataset_path,\n \"format\": \"json\"\ + ,\n \"text_field\": \"text\",\n \"label_field\": \"\ + label\"\n },\n \"model\": {\n \"base_model\": model_name,\n\ + \ \"num_labels\": 5,\n \"label_names\": [\"none\"\ + , \"minor\", \"moderate\", \"major\", \"contraindicated\"]\n },\n\ + \ \"training\": {\n \"epochs\": epochs,\n \"\ + learning_rate\": learning_rate,\n \"batch_size\": batch_size,\n\ + \ \"warmup_steps\": 100,\n \"weight_decay\": 0.01,\n\ + \ \"fp16\": True,\n \"evaluation_strategy\": \"epoch\"\ + ,\n \"save_strategy\": \"epoch\"\n },\n \"output\"\ + : {\n \"model_path\": \"models/ddi-detector\",\n \"\ + metrics_path\": \"models/ddi-detector/metrics.json\"\n }\n }\n\ + \n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\ + \ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\ + \ region_name='us-east-1'\n )\n\n config_json = json.dumps(config,\ + \ indent=2)\n config_path = \"configs/ddi_training_config.json\"\n\n\ + \ s3.put_object(\n Bucket='training-data',\n Key=config_path,\n\ + \ Body=config_json.encode('utf-8'),\n ContentType='application/json'\n\ + \ )\n\n print(f\"\u2705 Training config saved to training-data/{config_path}\"\ + )\n print(f\" Model: {model_name}\")\n print(f\" Epochs: {epochs}\"\ + )\n print(f\" Learning rate: {learning_rate}\")\n\n return f\"s3://training-data/{config_path}\"\ + \n\n" + image: python:3.11-slim +pipelineInfo: + description: Prepare DDI training data and configuration + name: ddi-data-preparation +root: + dag: + tasks: + create-ddi-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-ddi-dataset + inputs: + parameters: + minio_access_key: + runtimeValue: + constant: minioadmin + minio_endpoint: + componentInputParameter: minio_endpoint + minio_secret_key: + runtimeValue: + constant: minioadmin123! + output_path: + runtimeValue: + constant: ddi_train.json + taskInfo: + name: create-ddi-dataset + create-training-config: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-training-config + dependentTasks: + - create-ddi-dataset + inputs: + parameters: + dataset_path: + taskOutputParameter: + outputParameterKey: Output + producerTask: create-ddi-dataset + epochs: + componentInputParameter: epochs + learning_rate: + componentInputParameter: learning_rate + minio_access_key: + runtimeValue: + constant: minioadmin + minio_endpoint: + componentInputParameter: minio_endpoint + minio_secret_key: + runtimeValue: + constant: minioadmin123! + model_name: + componentInputParameter: model_name + taskInfo: + name: create-training-config + inputDefinitions: + parameters: + epochs: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + learning_rate: + defaultValue: 2.0e-05 + isOptional: true + parameterType: NUMBER_DOUBLE + minio_endpoint: + defaultValue: http://minio.minio.svc.cluster.local:9000 + isOptional: true + parameterType: STRING + model_name: + defaultValue: emilyalsentzer/Bio_ClinicalBERT + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.15.2 diff --git a/pipelines/ddi_data_prep.py b/pipelines/ddi_data_prep.py index 01fa5c8..e67a313 100644 --- a/pipelines/ddi_data_prep.py +++ b/pipelines/ddi_data_prep.py @@ -10,7 +10,7 @@ from kfp import compiler @dsl.component( base_image="python:3.11-slim", - packages_to_install=["boto3", "requests"] + packages_to_install=["boto3", "botocore", "requests"] ) def create_ddi_dataset( minio_endpoint: str, @@ -58,13 +58,24 @@ def create_ddi_dataset( {"text": "Vitamin D with calcium supplements", "label": 0}, ] - # Upload to MinIO + # Upload to MinIO with proper config for Tailscale endpoints + from botocore.config import Config + + s3_config = Config( + connect_timeout=30, + read_timeout=60, + retries={'max_attempts': 3}, + s3={'addressing_style': 'path'} + ) + s3 = boto3.client( 's3', endpoint_url=minio_endpoint, aws_access_key_id=minio_access_key, aws_secret_access_key=minio_secret_key, - region_name='us-east-1' + region_name='us-east-1', + config=s3_config, + verify=True ) data_json = json.dumps(training_data, indent=2)