From 8bbd0c443307dee6060573ddba5e475e3a8fb1ec Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Thu, 30 Jan 2025 15:28:57 +0530 Subject: [PATCH] Update KFTO multinode pytorch training test for disconnected --- tests/kfto/kfto_mnist_training_test.go | 53 +++++++++++++++++--------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go index c12d5c5e..186a288e 100644 --- a/tests/kfto/kfto_mnist_training_test.go +++ b/tests/kfto/kfto_mnist_training_test.go @@ -167,7 +167,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Command: []string{ "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ - pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib --verbose && \ echo "Downloading MNIST dataset..." && \ python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \ echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ @@ -251,7 +251,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Command: []string{ "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ - pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib --verbose && \ echo "Downloading MNIST dataset..." && \ python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \ echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \ @@ -306,14 +306,36 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, } + // Add PIP Index to download python packages, use provided custom PYPI mirror index url in case of disconnected environemnt + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "PIP_INDEX_URL", + Value: GetPipIndexURL(), + }, + { + Name: "PIP_TRUSTED_HOST", + Value: GetPipTrustedHost(), + }, + } + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = []corev1.EnvVar{ + { + Name: "PIP_INDEX_URL", + Value: GetPipIndexURL(), + }, + { + Name: "PIP_TRUSTED_HOST", + Value: GetPipTrustedHost(), + }, + } + if accelerator.isGpu() { // Update resource lists for GPU (NVIDIA/ROCm) usecase - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode)) - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ + torch_distributed_debug_env_vars := []corev1.EnvVar{ { Name: "NCCL_DEBUG", Value: "INFO", @@ -323,25 +345,19 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config Value: "DETAIL", }, } - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ - { - Name: "NCCL_DEBUG", - Value: "INFO", - }, - { - Name: "TORCH_DISTRIBUTED_DEBUG", - Value: "DETAIL", - }, + for _, envVar := range torch_distributed_debug_env_vars { + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name)) } // Update tolerations - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{ + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Tolerations = []corev1.Toleration{ { Key: accelerator.ResourceLabel, Operator: corev1.TolerationOpExists, }, } - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{ + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Tolerations = []corev1.Toleration{ { Key: accelerator.ResourceLabel, Operator: corev1.TolerationOpExists, @@ -377,6 +393,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config // Append the list of environment variables for the worker container for _, envVar := range storage_bucket_env_vars { tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name)) + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name)) } } else {