From d235f8e6963fd14cbd2f77ba28c1296331ea0dec Mon Sep 17 00:00:00 2001 From: kaihsun Date: Sun, 24 Nov 2024 10:18:46 +0000 Subject: [PATCH 1/6] update Signed-off-by: kaihsun --- .../cloud_providers/kuberay/cloud_provider.py | 29 +++++++++++---- .../autoscaler/v2/tests/test_node_provider.py | 37 +++++++++++++++++++ 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index 19e236cb4d19..500bcd6a7188 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -209,21 +209,22 @@ def _initialize_scale_request( cur_instances = self.instances # Get the worker groups that have pending deletes and the worker groups that - # have finished deletes. + # have finished deletes, and the set of workers included in the workersToDelete + # field of any worker group. ( worker_groups_with_pending_deletes, worker_groups_without_pending_deletes, + worker_to_delete_set, ) = self._get_workers_groups_with_deletes( ray_cluster, set(cur_instances.keys()) ) # Calculate the desired number of workers by type. num_workers_dict = defaultdict(int) - for _, cur_instance in cur_instances.items(): - if cur_instance.node_kind == NodeKind.HEAD: - # Only track workers. - continue - num_workers_dict[cur_instance.node_type] += 1 + worker_groups = ray_cluster["spec"].get("workerGroupSpecs", []) + for worker_group in worker_groups: + node_type = worker_group["groupName"] + num_workers_dict[node_type] = worker_group["replicas"] # Add to launch nodes. for node_type, count in to_launch.items(): @@ -242,6 +243,11 @@ def _initialize_scale_request( # Not possible to delete head node. continue + if to_delete_instance.cloud_instance_id in worker_to_delete_set: + # If the instance is already in the workersToDelete field of + # any worker group, skip it. + continue + num_workers_dict[to_delete_instance.node_type] -= 1 assert num_workers_dict[to_delete_instance.node_type] >= 0 to_delete_instances_by_type[to_delete_instance.node_type].append( @@ -321,6 +327,7 @@ def _submit_scale_request( # No patch required. return + logger.info(f"Submitting a scale request: {scale_request}") self._patch(f"rayclusters/{self._cluster_name}", patch_payload) def _add_launch_errors( @@ -404,10 +411,13 @@ def _get_workers_groups_with_deletes( deletes. worker_groups_with_finished_deletes: The worker groups that have finished deletes. + worker_to_delete_set: A set of Pods that are included in the workersToDelete + field of any worker group. """ worker_groups_with_pending_deletes = set() worker_groups_with_deletes = set() + worker_to_delete_set = set() worker_groups = ray_cluster_spec["spec"].get("workerGroupSpecs", []) for worker_group in worker_groups: @@ -422,6 +432,7 @@ def _get_workers_groups_with_deletes( worker_groups_with_deletes.add(node_type) for worker in workersToDelete: + worker_to_delete_set.add(worker) if worker in node_set: worker_groups_with_pending_deletes.add(node_type) break @@ -429,7 +440,11 @@ def _get_workers_groups_with_deletes( worker_groups_with_finished_deletes = ( worker_groups_with_deletes - worker_groups_with_pending_deletes ) - return worker_groups_with_pending_deletes, worker_groups_with_finished_deletes + return ( + worker_groups_with_pending_deletes, + worker_groups_with_finished_deletes, + worker_to_delete_set, + ) def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]: """ diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index 02d84e376b8d..a8e010f58f65 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -492,6 +492,43 @@ def test_pending_deletes(self): }, ] + def test_inconsistent_pods_raycr(self): + """ + Test the case where the cluster state has not yet reached the desired state. + Specifically, the replicas field in the RayCluster CR does not match the actual + number of Pods. + """ + # Check the assumptions of the test + small_group = "small-group" + num_pods = 0 + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + desired_replicas = num_pods + 1 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "replicas" + ] = desired_replicas + + # Launch a new node. The replicas field should be incremented by 1, even though + # the cluster state has not yet reached the goal state. + launch_request = {"small-group": 1} + self.provider.launch(shape=launch_request, request_id="launch-1") + + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 1 + assert patches[0] == { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": desired_replicas + 1, + } + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): From 24ba62a6b198f1e2d2ee9ceb029e2bbb612d3849 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 25 Nov 2024 04:52:12 +0000 Subject: [PATCH 2/6] update Signed-off-by: kaihsun --- .../cloud_providers/kuberay/cloud_provider.py | 6 +-- .../autoscaler/v2/tests/test_node_provider.py | 51 ++++++++++++++++++- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index 500bcd6a7188..342a1238e2ce 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -215,9 +215,7 @@ def _initialize_scale_request( worker_groups_with_pending_deletes, worker_groups_without_pending_deletes, worker_to_delete_set, - ) = self._get_workers_groups_with_deletes( - ray_cluster, set(cur_instances.keys()) - ) + ) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys())) # Calculate the desired number of workers by type. num_workers_dict = defaultdict(int) @@ -399,7 +397,7 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]: return copy.deepcopy(self._cached_instances) @staticmethod - def _get_workers_groups_with_deletes( + def _get_workers_delete_info( ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId] ) -> Tuple[Set[NodeType], Set[NodeType]]: """ diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index a8e010f58f65..e3ab30cf5787 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -492,7 +492,7 @@ def test_pending_deletes(self): }, ] - def test_inconsistent_pods_raycr(self): + def test_inconsistent_pods_raycr_scale_up(self): """ Test the case where the cluster state has not yet reached the desired state. Specifically, the replicas field in the RayCluster CR does not match the actual @@ -529,6 +529,55 @@ def test_inconsistent_pods_raycr(self): "value": desired_replicas + 1, } + def test_inconsistent_pods_raycr_scale_down(self): + """ + Test the case where the cluster state has not yet reached the desired state. + Specifically, the replicas field in the RayCluster CR does not match the actual + number of Pods. + """ + # Check the assumptions of the test + small_group = "small-group" + num_pods = 0 + pod_to_delete = None + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + pod_to_delete = pod["metadata"]["name"] + assert pod_to_delete is not None + + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + desired_replicas = num_pods + 1 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "replicas" + ] = desired_replicas + + # Terminate a node. The replicas field should be decremented by 1, even though + # the cluster state has not yet reached the goal state. + self.provider.terminate(ids=[pod_to_delete], request_id="term-1") + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 2 + assert patches == [ + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": desired_replicas - 1, + }, + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/scaleStrategy", + "value": { + "workersToDelete": [ + pod_to_delete, + ] + }, + }, + ] + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): From 2c9c3d0afe84f5f1c727b435960206399d573eb7 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Mon, 25 Nov 2024 19:43:53 +0000 Subject: [PATCH 3/6] update Signed-off-by: kaihsun --- .../instance_manager/cloud_providers/kuberay/cloud_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index 342a1238e2ce..ffc3e4721ccc 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -399,7 +399,7 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]: @staticmethod def _get_workers_delete_info( ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId] - ) -> Tuple[Set[NodeType], Set[NodeType]]: + ) -> Tuple[Set[NodeType], Set[NodeType], Set[CloudInstanceId]]: """ Gets the worker groups that have pending deletes and the worker groups that have finished deletes. From 743df8703d2dac21e480b3cbb684669c60a3fba0 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Tue, 26 Nov 2024 05:17:48 +0000 Subject: [PATCH 4/6] update Signed-off-by: kaihsun --- .../cloud_providers/kuberay/cloud_provider.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index ffc3e4721ccc..c1b8ddc2a31b 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -222,7 +222,12 @@ def _initialize_scale_request( worker_groups = ray_cluster["spec"].get("workerGroupSpecs", []) for worker_group in worker_groups: node_type = worker_group["groupName"] - num_workers_dict[node_type] = worker_group["replicas"] + # Handle the case where users manually increase `minReplicas` + # to scale up the number of worker Pods. In this scenario, + # `replicas` will be smaller than `minReplicas`. + num_workers_dict[node_type] = max( + worker_group["replicas"], worker_group["minReplicas"] + ) # Add to launch nodes. for node_type, count in to_launch.items(): From 348f8f2af7c0ccf4793b79258b317c1a696a6571 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Tue, 26 Nov 2024 07:41:46 +0000 Subject: [PATCH 5/6] update Signed-off-by: kaihsun --- .../kuberay/ray-cluster.complete.yaml | 2 +- .../autoscaler/v2/tests/test_node_provider.py | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml b/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml index f51ac9d4c242..d57f5d6f23b9 100644 --- a/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml +++ b/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml @@ -108,7 +108,7 @@ spec: workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 - minReplicas: 1 + minReplicas: 0 maxReplicas: 300 # logical group name, for this called small-group, also can be functional groupName: small-group diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index e3ab30cf5787..47483d3f61fa 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -492,6 +492,38 @@ def test_pending_deletes(self): }, ] + def test_increase_min_replicas_to_scale_up(self): + # Simulate the case where users manually increase the `minReplicas` field + # from 0 to $num_pods. KubeRay will create $num_pods worker Pods to meet the new + # `minReplicas`, even though the `replicas` field is still 0. + small_group = "small-group" + num_pods = 0 + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + assert num_pods > 0 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] = 0 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "minReplicas" + ] = num_pods + + # Launching a new node and `replicas` should be + # `max(replicas, minReplicas) + 1`. + self.provider.launch(shape={small_group: 1}, request_id="launch-1") + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 1 + assert patches[0] == { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": num_pods + 1, + } + def test_inconsistent_pods_raycr_scale_up(self): """ Test the case where the cluster state has not yet reached the desired state. From adfca345175b9692fc498fafd61ecd103ad13597 Mon Sep 17 00:00:00 2001 From: kaihsun Date: Tue, 26 Nov 2024 08:36:40 +0000 Subject: [PATCH 6/6] fix tests Signed-off-by: kaihsun --- python/ray/tests/kuberay/test_autoscaling_config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index 6ae9ab7fc4fe..12b5c239f4a6 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -82,7 +82,7 @@ def _get_basic_autoscaling_config() -> dict: }, "small-group": { "max_workers": 300, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1, @@ -95,7 +95,7 @@ def _get_basic_autoscaling_config() -> dict: # and modified max_workers. "gpu-group": { "max_workers": 200, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1, @@ -109,7 +109,7 @@ def _get_basic_autoscaling_config() -> dict: # and modified max_workers and node_config. "tpu-group": { "max_workers": 4, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1,