diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index facf340211b..1d967a825a5 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -132,6 +132,14 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, autoscalerContainer) } + if gcsFtOptions := instance.Spec.GcsFaultToleranceOptions; gcsFtOptions != nil { + // If `GcsFaultToleranceOptions.RedisUsername` is set, it will be put into the `REDIS_USERNAME` environment variable later. + // Here, we use `$REDIS_USERNAME` in rayStartParams to refer to the environment variable. + if gcsFtOptions.RedisUsername != nil { + headSpec.RayStartParams["redis-username"] = "$REDIS_USERNAME" + } + } + // If the metrics port does not exist in the Ray container, add a default one for Prometheus. isMetricsPortExists := utils.FindContainerPort(&podTemplate.Spec.Containers[utils.RayContainerIndex], utils.MetricsPortName, -1) != -1 if !isMetricsPortExists { @@ -695,6 +703,13 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, gcsOpti Name: utils.RAY_REDIS_ADDRESS, Value: gcsOptions.RedisAddress, }) + if gcsOptions.RedisUsername != nil { + container.Env = append(container.Env, corev1.EnvVar{ + Name: utils.REDIS_USERNAME, + Value: gcsOptions.RedisUsername.Value, + ValueFrom: gcsOptions.RedisUsername.ValueFrom, + }) + } if gcsOptions.RedisPassword != nil { container.Env = utils.UpsertEnvVar(container.Env, corev1.EnvVar{ Name: utils.REDIS_PASSWORD, diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 50f1fc5c3b0..ff15565c7c7 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -672,6 +672,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { cluster.Annotations = map[string]string{} cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{ RedisAddress: "redis://127.0.0.1:6379", + RedisUsername: &rayv1.RedisCredential{Value: "myuser"}, RedisPassword: &rayv1.RedisCredential{Value: "mypassword"}, } @@ -679,6 +680,9 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "") rayContainer = pod.Spec.Containers[utils.RayContainerIndex] + if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") { + t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0]) + } if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") { t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0]) } @@ -689,7 +693,8 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { cluster.UID = "mycluster" cluster.Annotations = map[string]string{} cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{ - RedisAddress: "redis://127.0.0.1:6379", + RedisAddress: "redis://127.0.0.1:6379", + RedisUsername: &rayv1.RedisCredential{Value: "myuser"}, RedisPassword: &rayv1.RedisCredential{ ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ @@ -706,6 +711,9 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) { pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "") rayContainer = pod.Spec.Containers[utils.RayContainerIndex] + if !strings.Contains(rayContainer.Args[0], "--redis-username=$REDIS_USERNAME") { + t.Fatalf("redis username not found in the ray start command %s", rayContainer.Args[0]) + } if !strings.Contains(rayContainer.Args[0], "--redis-password=$REDIS_PASSWORD") { t.Fatalf("Ray container expected `--redis-password=$REDIS_PASSWORD` in `%v`", rayContainer.Args[0]) } diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index afc2ce101af..74ede75a0e9 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1212,8 +1212,12 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc "import sys; " + "redis_address = os.getenv('RAY_REDIS_ADDRESS', '').split(',')[0]; " + "redis_address = redis_address if '://' in redis_address else 'redis://' + redis_address; " + - "parsed = urlparse(redis_address); " + - "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"", + "parsed = urlparse(redis_address); ", + } + if utils.EnvVarExists(utils.REDIS_USERNAME, pod.Spec.Containers[utils.RayContainerIndex].Env) { + pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, username=os.getenv('REDIS_USERNAME', parsed.username), password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"" + } else { + pod.Spec.Containers[utils.RayContainerIndex].Args[0] += "sys.exit(1) if not cleanup_redis_storage(host=parsed.hostname, port=parsed.port, password=os.getenv('REDIS_PASSWORD', parsed.password), use_ssl=parsed.scheme=='rediss', storage_namespace=os.getenv('RAY_external_storage_namespace')) else None\"" } // Disable liveness and readiness probes because the Job will not launch processes like Raylet and GCS. diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index e9c97a6e36c..a22d117892e 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -97,6 +97,7 @@ const ( RAY_ADDRESS = "RAY_ADDRESS" RAY_REDIS_ADDRESS = "RAY_REDIS_ADDRESS" REDIS_PASSWORD = "REDIS_PASSWORD" + REDIS_USERNAME = "REDIS_USERNAME" RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE = "RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE" RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace" RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S = "RAY_gcs_rpc_server_reconnect_timeout_s"