Skip to content

Commit

Permalink
Update KFTO multinode pytorch training test for disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeet-dhumal authored and openshift-merge-bot[bot] committed Jan 30, 2025
1 parent 25f24d5 commit 8bbd0c4
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib --verbose && \
echo "Downloading MNIST dataset..." && \
python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
Expand Down Expand Up @@ -251,7 +251,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Command: []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib /tmp/datasets/mnist && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib --verbose && \
echo "Downloading MNIST dataset..." && \
python3 /mnt/files/download_mnist_datasets.py --dataset_path "/tmp/datasets/mnist" && \
echo -e "\n\n Dataset downloaded to /tmp/datasets/mnist" && ls -R /tmp/datasets/mnist && \
Expand Down Expand Up @@ -306,14 +306,36 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
}

// Add PIP Index to download python packages, use provided custom PYPI mirror index url in case of disconnected environemnt
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "PIP_INDEX_URL",
Value: GetPipIndexURL(),
},
{
Name: "PIP_TRUSTED_HOST",
Value: GetPipTrustedHost(),
},
}
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "PIP_INDEX_URL",
Value: GetPipIndexURL(),
},
{
Name: "PIP_TRUSTED_HOST",
Value: GetPipTrustedHost(),
},
}

if accelerator.isGpu() {
// Update resource lists for GPU (NVIDIA/ROCm) usecase
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(accelerator.ResourceLabel)] = resource.MustParse(fmt.Sprint(numProcPerNode))

tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
torch_distributed_debug_env_vars := []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
Expand All @@ -323,25 +345,19 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
Value: "DETAIL",
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
{
Name: "TORCH_DISTRIBUTED_DEBUG",
Value: "DETAIL",
},
for _, envVar := range torch_distributed_debug_env_vars {
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name))
}

// Update tolerations
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Tolerations = []corev1.Toleration{
{
Key: accelerator.ResourceLabel,
Operator: corev1.TolerationOpExists,
Expand Down Expand Up @@ -377,6 +393,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
// Append the list of environment variables for the worker container
for _, envVar := range storage_bucket_env_vars {
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name))
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env = upsert(tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeWorker].Template.Spec.Containers[0].Env, envVar, withEnvVarName(envVar.Name))
}

} else {
Expand Down

0 comments on commit 8bbd0c4

Please sign in to comment.