diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index facf340211b..1d967a825a5 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -132,6 +132,14 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, autoscalerContainer) } + if gcsFtOptions := instance.Spec.GcsFaultToleranceOptions; gcsFtOptions != nil { + // If `GcsFaultToleranceOptions.RedisUsername` is set, it will be put into the `REDIS_USERNAME` environment variable later. + // Here, we use `$REDIS_USERNAME` in rayStartParams to refer to the environment variable. + if gcsFtOptions.RedisUsername != nil { + headSpec.RayStartParams["redis-username"] = "$REDIS_USERNAME" + } + } + // If the metrics port does not exist in the Ray container, add a default one for Prometheus. isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[utils.RayContainerIndex], utils.MetricsPortName, -1) != -1 if !isMetricsPortExists { @@ -695,6 +703,13 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, gcsOpti Name: utils.RAY_REDIS_ADDRESS, Value: gcsOptions.RedisAddress, }) + if gcsOptions.RedisUsername != nil { + container.Env = append(container.Env, corev1.EnvVar{ + Name: utils.REDIS_USERNAME, + Value: gcsOptions.RedisUsername.Value, + ValueFrom: gcsOptions.RedisUsername.ValueFrom, + }) + } if gcsOptions.RedisPassword != nil { container.Env = utils.UpsertEnvVar(container.Env, corev1.EnvVar{ Name: utils.REDIS_PASSWORD, diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 50f1fc5c3b0..d4f6a229ccf 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -652,6 +652,8 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{ ExternalStorageNamespace: "myns", RedisAddress: "redis://127.0.0.1:6379", + RedisUsername: &rayv1.RedisCredential{Value: "myuser"}, + RedisPassword: &rayv1.RedisCredential{Value: "mypass"}, } podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379") pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "") @@ -665,13 +667,23 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { } checkContainerEnv(t, rayContainer, utils.RAY_EXTERNAL_STORAGE_NS, "myns") checkContainerEnv(t, rayContainer, utils.RAY_REDIS_ADDRESS, "redis://127.0.0.1:6379") + checkContainerEnv(t, rayContainer, utils.REDIS_USERNAME, "myuser") + checkContainerEnv(t, rayContainer, utils.REDIS_PASSWORD, "mypass") - // Test 7 with a plain text redis password in GcsFaultToleranceOptions + if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") { + t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0]) + } + if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") { + t.Fatalf("redis password not found in the ray start command %s", rayContainer.Args[0]) + } + + // Test 7 with a plain text redis password and username in GcsFaultToleranceOptions cluster = instance.DeepCopy() cluster.UID = "mycluster" cluster.Annotations = map[string]string{} cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{ RedisAddress: "redis://127.0.0.1:6379", + RedisUsername: &rayv1.RedisCredential{Value: "myuser"}, RedisPassword: &rayv1.RedisCredential{Value: "mypassword"}, } @@ -679,17 +691,31 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "") rayContainer = pod.Spec.Containers[utils.RayContainerIndex] + if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") { + t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0]) + } if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") { t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0]) } + checkContainerEnv(t, rayContainer, utils.REDIS_USERNAME, "myuser") checkContainerEnv(t, rayContainer, utils.REDIS_PASSWORD, "mypassword") - // Test 8 with a redis password from secret in GcsFaultToleranceOptions + // Test 8 with a redis password and username from secret in GcsFaultToleranceOptions cluster = instance.DeepCopy() cluster.UID = "mycluster" cluster.Annotations = map[string]string{} cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{ RedisAddress: "redis://127.0.0.1:6379", + RedisUsername: &rayv1.RedisCredential{ + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "redis-username-secret", + }, + Key: "username", + }, + }, + }, RedisPassword: &rayv1.RedisCredential{ ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ @@ -706,10 +732,21 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "") rayContainer = pod.Spec.Containers[utils.RayContainerIndex] + if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") { + t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0]) + } if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") { t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0]) } + usernameEnv := getEnvVar(rayContainer, utils.REDIS_USERNAME) + if usernameEnv == nil || usernameEnv.ValueFrom == nil || usernameEnv.ValueFrom.SecretKeyRef == nil { + t.Fatalf("Ray container expected env `%v` in `%v`", utils.REDIS_USERNAME, rayContainer.Env) + } + if usernameEnv.ValueFrom.SecretKeyRef.LocalObjectReference.Name != "redis-username-secret" || + usernameEnv.ValueFrom.SecretKeyRef.Key != "username" { + t.Fatalf("Ray container expected secret `redis-username-secret` with key `username` ") + } passwordEnv := getEnvVar(rayContainer, utils.REDIS_PASSWORD) if passwordEnv == nil || passwordEnv.ValueFrom == nil || passwordEnv.ValueFrom.SecretKeyRef == nil { t.Fatalf("Ray container expected env `%v` in `%v`", utils.REDIS_PASSWORD, rayContainer.Env) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index afc2ce101af..74ede75a0e9 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1212,8 +1212,12 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc "import sys; " + "redis_address = os.getenv('RAY_REDIS_ADDRESS', '').split(',')[0]; " + "redis_address = redis_address if '://' in redis_address else 'redis://' + redis_address; " + - "parsed = urlparse(redis_address); " + - "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"", + "parsed = urlparse(redis_address); ", + } + if utils.EnvVarExists(utils.REDIS_USERNAME, pod.Spec.Containers[utils.RayContainerIndex].Env) { + pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, username=os.getenv('REDIS_USERNAME', parsed.username), password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"" + } else { + pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"" } // Disable liveness and readiness probes because the Job will not launch processes like Raylet and GCS. diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index e9c97a6e36c..a22d117892e 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -97,6 +97,7 @@ const ( RAY_ADDRESS = "RAY_ADDRESS" RAY_REDIS_ADDRESS = "RAY_REDIS_ADDRESS" REDIS_PASSWORD = "REDIS_PASSWORD" + REDIS_USERNAME = "REDIS_USERNAME" RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE = "RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE" RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace" RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S = "RAY_gcs_rpc_server_reconnect_timeout_s"