Files
kubeflow-pipelines/ddi_data_prep_ts.yaml

266 lines
12 KiB
YAML

# PIPELINE DEFINITION
# Name: ddi-data-preparation
# Description: Prepare DDI training data and configuration
# Inputs:
# epochs: int [Default: 3.0]
# learning_rate: float [Default: 2e-05]
# minio_endpoint: str [Default: 'http://minio.minio.svc.cluster.local:9000']
# model_name: str [Default: 'emilyalsentzer/Bio_ClinicalBERT']
components:
comp-create-ddi-dataset:
executorLabel: exec-create-ddi-dataset
inputDefinitions:
parameters:
minio_access_key:
parameterType: STRING
minio_endpoint:
parameterType: STRING
minio_secret_key:
parameterType: STRING
output_path:
defaultValue: ddi_train.json
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
comp-create-training-config:
executorLabel: exec-create-training-config
inputDefinitions:
parameters:
batch_size:
defaultValue: 16.0
isOptional: true
parameterType: NUMBER_INTEGER
dataset_path:
parameterType: STRING
epochs:
defaultValue: 3.0
isOptional: true
parameterType: NUMBER_INTEGER
learning_rate:
defaultValue: 2.0e-05
isOptional: true
parameterType: NUMBER_DOUBLE
minio_access_key:
parameterType: STRING
minio_endpoint:
parameterType: STRING
minio_secret_key:
parameterType: STRING
model_name:
defaultValue: emilyalsentzer/Bio_ClinicalBERT
isOptional: true
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-create-ddi-dataset:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- create_ddi_dataset
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'boto3' 'requests'\
\ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef create_ddi_dataset(\n minio_endpoint: str,\n minio_access_key:\
\ str,\n minio_secret_key: str,\n output_path: str = \"ddi_train.json\"\
\n) -> str:\n \"\"\"Create DDI training dataset and upload to MinIO.\"\
\"\"\n import json\n import boto3\n\n # DDI training data (drug\
\ pairs with interaction severity)\n # Labels: 0=none, 1=minor, 2=moderate,\
\ 3=major, 4=contraindicated\n training_data = [\n # Major interactions\n\
\ {\"text\": \"Patient taking warfarin and aspirin together\", \"\
label\": 3},\n {\"text\": \"Concurrent use of simvastatin and amiodarone\"\
, \"label\": 3},\n {\"text\": \"Methotrexate and NSAIDs used together\"\
, \"label\": 3},\n {\"text\": \"Ciprofloxacin and theophylline interaction\"\
, \"label\": 3},\n {\"text\": \"Digoxin and amiodarone combination\
\ therapy\", \"label\": 3},\n {\"text\": \"Lithium and ACE inhibitors\
\ together\", \"label\": 3},\n\n # Contraindicated\n {\"text\"\
: \"Fluoxetine and tramadol co-administration\", \"label\": 4},\n \
\ {\"text\": \"SSRIs with MAO inhibitors\", \"label\": 4},\n {\"\
text\": \"Benzodiazepines with opioids\", \"label\": 4},\n {\"text\"\
: \"Metronidazole and alcohol consumption\", \"label\": 4},\n {\"\
text\": \"Linezolid with serotonergic drugs\", \"label\": 4},\n\n \
\ # Moderate\n {\"text\": \"Patient prescribed omeprazole with clopidogrel\"\
, \"label\": 2},\n {\"text\": \"Atorvastatin given with diltiazem\"\
, \"label\": 2},\n {\"text\": \"ACE inhibitor with potassium supplement\"\
, \"label\": 2},\n {\"text\": \"Metformin with contrast dye procedures\"\
, \"label\": 2},\n\n # Minor\n {\"text\": \"Levothyroxine\
\ taken with calcium supplements\", \"label\": 1},\n {\"text\": \"\
Antacids with oral antibiotics timing\", \"label\": 1},\n {\"text\"\
: \"Iron supplements with dairy products\", \"label\": 1},\n\n #\
\ No interaction\n {\"text\": \"Metformin administered with lisinopril\"\
, \"label\": 0},\n {\"text\": \"Amlodipine with metoprolol combination\"\
, \"label\": 0},\n {\"text\": \"Omeprazole and acetaminophen together\"\
, \"label\": 0},\n {\"text\": \"Vitamin D with calcium supplements\"\
, \"label\": 0},\n ]\n\n # Upload to MinIO\n s3 = boto3.client(\n\
\ 's3',\n endpoint_url=minio_endpoint,\n aws_access_key_id=minio_access_key,\n\
\ aws_secret_access_key=minio_secret_key,\n region_name='us-east-1'\n\
\ )\n\n data_json = json.dumps(training_data, indent=2)\n s3.put_object(\n\
\ Bucket='datasets',\n Key=output_path,\n Body=data_json.encode('utf-8'),\n\
\ ContentType='application/json'\n )\n\n print(f\"\u2705 Uploaded\
\ {len(training_data)} samples to datasets/{output_path}\")\n print(f\"\
\ - Contraindicated: {sum(1 for d in training_data if d['label'] == 4)}\"\
)\n print(f\" - Major: {sum(1 for d in training_data if d['label']\
\ == 3)}\")\n print(f\" - Moderate: {sum(1 for d in training_data if\
\ d['label'] == 2)}\")\n print(f\" - Minor: {sum(1 for d in training_data\
\ if d['label'] == 1)}\")\n print(f\" - None: {sum(1 for d in training_data\
\ if d['label'] == 0)}\")\n\n return f\"s3://datasets/{output_path}\"\
\n\n"
image: python:3.11-slim
exec-create-training-config:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- create_training_config
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'boto3' && \
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.15.2'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef create_training_config(\n minio_endpoint: str,\n minio_access_key:\
\ str,\n minio_secret_key: str,\n dataset_path: str,\n model_name:\
\ str = \"emilyalsentzer/Bio_ClinicalBERT\",\n epochs: int = 3,\n \
\ learning_rate: float = 2e-5,\n batch_size: int = 16\n) -> str:\n \
\ \"\"\"Create training configuration file.\"\"\"\n import json\n \
\ import boto3\n from datetime import datetime\n\n config = {\n \
\ \"created_at\": datetime.utcnow().isoformat(),\n \"dataset\"\
: {\n \"path\": dataset_path,\n \"format\": \"json\"\
,\n \"text_field\": \"text\",\n \"label_field\": \"\
label\"\n },\n \"model\": {\n \"base_model\": model_name,\n\
\ \"num_labels\": 5,\n \"label_names\": [\"none\"\
, \"minor\", \"moderate\", \"major\", \"contraindicated\"]\n },\n\
\ \"training\": {\n \"epochs\": epochs,\n \"\
learning_rate\": learning_rate,\n \"batch_size\": batch_size,\n\
\ \"warmup_steps\": 100,\n \"weight_decay\": 0.01,\n\
\ \"fp16\": True,\n \"evaluation_strategy\": \"epoch\"\
,\n \"save_strategy\": \"epoch\"\n },\n \"output\"\
: {\n \"model_path\": \"models/ddi-detector\",\n \"\
metrics_path\": \"models/ddi-detector/metrics.json\"\n }\n }\n\
\n s3 = boto3.client(\n 's3',\n endpoint_url=minio_endpoint,\n\
\ aws_access_key_id=minio_access_key,\n aws_secret_access_key=minio_secret_key,\n\
\ region_name='us-east-1'\n )\n\n config_json = json.dumps(config,\
\ indent=2)\n config_path = \"configs/ddi_training_config.json\"\n\n\
\ s3.put_object(\n Bucket='training-data',\n Key=config_path,\n\
\ Body=config_json.encode('utf-8'),\n ContentType='application/json'\n\
\ )\n\n print(f\"\u2705 Training config saved to training-data/{config_path}\"\
)\n print(f\" Model: {model_name}\")\n print(f\" Epochs: {epochs}\"\
)\n print(f\" Learning rate: {learning_rate}\")\n\n return f\"s3://training-data/{config_path}\"\
\n\n"
image: python:3.11-slim
pipelineInfo:
description: Prepare DDI training data and configuration
name: ddi-data-preparation
root:
dag:
tasks:
create-ddi-dataset:
cachingOptions:
enableCache: true
componentRef:
name: comp-create-ddi-dataset
inputs:
parameters:
minio_access_key:
runtimeValue:
constant: minioadmin
minio_endpoint:
componentInputParameter: minio_endpoint
minio_secret_key:
runtimeValue:
constant: minioadmin123!
output_path:
runtimeValue:
constant: ddi_train.json
taskInfo:
name: create-ddi-dataset
create-training-config:
cachingOptions:
enableCache: true
componentRef:
name: comp-create-training-config
dependentTasks:
- create-ddi-dataset
inputs:
parameters:
dataset_path:
taskOutputParameter:
outputParameterKey: Output
producerTask: create-ddi-dataset
epochs:
componentInputParameter: epochs
learning_rate:
componentInputParameter: learning_rate
minio_access_key:
runtimeValue:
constant: minioadmin
minio_endpoint:
componentInputParameter: minio_endpoint
minio_secret_key:
runtimeValue:
constant: minioadmin123!
model_name:
componentInputParameter: model_name
taskInfo:
name: create-training-config
inputDefinitions:
parameters:
epochs:
defaultValue: 3.0
isOptional: true
parameterType: NUMBER_INTEGER
learning_rate:
defaultValue: 2.0e-05
isOptional: true
parameterType: NUMBER_DOUBLE
minio_endpoint:
defaultValue: http://minio.minio.svc.cluster.local:9000
isOptional: true
parameterType: STRING
model_name:
defaultValue: emilyalsentzer/Bio_ClinicalBERT
isOptional: true
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.15.2