diff --git a/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml b/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml index f51ac9d4c2427..d57f5d6f23b98 100644 --- a/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml +++ b/python/ray/autoscaler/kuberay/ray-cluster.complete.yaml @@ -108,7 +108,7 @@ spec: workerGroupSpecs: # the pod replicas in this group typed worker - replicas: 1 - minReplicas: 1 + minReplicas: 0 maxReplicas: 300 # logical group name, for this called small-group, also can be functional groupName: small-group diff --git a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py index 19e236cb4d19d..c1b8ddc2a31b9 100644 --- a/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py +++ b/python/ray/autoscaler/v2/instance_manager/cloud_providers/kuberay/cloud_provider.py @@ -209,21 +209,25 @@ def _initialize_scale_request( cur_instances = self.instances # Get the worker groups that have pending deletes and the worker groups that - # have finished deletes. + # have finished deletes, and the set of workers included in the workersToDelete + # field of any worker group. ( worker_groups_with_pending_deletes, worker_groups_without_pending_deletes, - ) = self._get_workers_groups_with_deletes( - ray_cluster, set(cur_instances.keys()) - ) + worker_to_delete_set, + ) = self._get_workers_delete_info(ray_cluster, set(cur_instances.keys())) # Calculate the desired number of workers by type. num_workers_dict = defaultdict(int) - for _, cur_instance in cur_instances.items(): - if cur_instance.node_kind == NodeKind.HEAD: - # Only track workers. - continue - num_workers_dict[cur_instance.node_type] += 1 + worker_groups = ray_cluster["spec"].get("workerGroupSpecs", []) + for worker_group in worker_groups: + node_type = worker_group["groupName"] + # Handle the case where users manually increase `minReplicas` + # to scale up the number of worker Pods. In this scenario, + # `replicas` will be smaller than `minReplicas`. + num_workers_dict[node_type] = max( + worker_group["replicas"], worker_group["minReplicas"] + ) # Add to launch nodes. for node_type, count in to_launch.items(): @@ -242,6 +246,11 @@ def _initialize_scale_request( # Not possible to delete head node. continue + if to_delete_instance.cloud_instance_id in worker_to_delete_set: + # If the instance is already in the workersToDelete field of + # any worker group, skip it. + continue + num_workers_dict[to_delete_instance.node_type] -= 1 assert num_workers_dict[to_delete_instance.node_type] >= 0 to_delete_instances_by_type[to_delete_instance.node_type].append( @@ -321,6 +330,7 @@ def _submit_scale_request( # No patch required. return + logger.info(f"Submitting a scale request: {scale_request}") self._patch(f"rayclusters/{self._cluster_name}", patch_payload) def _add_launch_errors( @@ -392,9 +402,9 @@ def instances(self) -> Dict[CloudInstanceId, CloudInstance]: return copy.deepcopy(self._cached_instances) @staticmethod - def _get_workers_groups_with_deletes( + def _get_workers_delete_info( ray_cluster_spec: Dict[str, Any], node_set: Set[CloudInstanceId] - ) -> Tuple[Set[NodeType], Set[NodeType]]: + ) -> Tuple[Set[NodeType], Set[NodeType], Set[CloudInstanceId]]: """ Gets the worker groups that have pending deletes and the worker groups that have finished deletes. @@ -404,10 +414,13 @@ def _get_workers_groups_with_deletes( deletes. worker_groups_with_finished_deletes: The worker groups that have finished deletes. + worker_to_delete_set: A set of Pods that are included in the workersToDelete + field of any worker group. """ worker_groups_with_pending_deletes = set() worker_groups_with_deletes = set() + worker_to_delete_set = set() worker_groups = ray_cluster_spec["spec"].get("workerGroupSpecs", []) for worker_group in worker_groups: @@ -422,6 +435,7 @@ def _get_workers_groups_with_deletes( worker_groups_with_deletes.add(node_type) for worker in workersToDelete: + worker_to_delete_set.add(worker) if worker in node_set: worker_groups_with_pending_deletes.add(node_type) break @@ -429,7 +443,11 @@ def _get_workers_groups_with_deletes( worker_groups_with_finished_deletes = ( worker_groups_with_deletes - worker_groups_with_pending_deletes ) - return worker_groups_with_pending_deletes, worker_groups_with_finished_deletes + return ( + worker_groups_with_pending_deletes, + worker_groups_with_finished_deletes, + worker_to_delete_set, + ) def _fetch_instances(self) -> Dict[CloudInstanceId, CloudInstance]: """ diff --git a/python/ray/autoscaler/v2/tests/test_node_provider.py b/python/ray/autoscaler/v2/tests/test_node_provider.py index 02d84e376b8df..47483d3f61faa 100644 --- a/python/ray/autoscaler/v2/tests/test_node_provider.py +++ b/python/ray/autoscaler/v2/tests/test_node_provider.py @@ -492,6 +492,124 @@ def test_pending_deletes(self): }, ] + def test_increase_min_replicas_to_scale_up(self): + # Simulate the case where users manually increase the `minReplicas` field + # from 0 to $num_pods. KubeRay will create $num_pods worker Pods to meet the new + # `minReplicas`, even though the `replicas` field is still 0. + small_group = "small-group" + num_pods = 0 + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + assert num_pods > 0 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] = 0 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "minReplicas" + ] = num_pods + + # Launching a new node and `replicas` should be + # `max(replicas, minReplicas) + 1`. + self.provider.launch(shape={small_group: 1}, request_id="launch-1") + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 1 + assert patches[0] == { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": num_pods + 1, + } + + def test_inconsistent_pods_raycr_scale_up(self): + """ + Test the case where the cluster state has not yet reached the desired state. + Specifically, the replicas field in the RayCluster CR does not match the actual + number of Pods. + """ + # Check the assumptions of the test + small_group = "small-group" + num_pods = 0 + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + desired_replicas = num_pods + 1 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "replicas" + ] = desired_replicas + + # Launch a new node. The replicas field should be incremented by 1, even though + # the cluster state has not yet reached the goal state. + launch_request = {"small-group": 1} + self.provider.launch(shape=launch_request, request_id="launch-1") + + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 1 + assert patches[0] == { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": desired_replicas + 1, + } + + def test_inconsistent_pods_raycr_scale_down(self): + """ + Test the case where the cluster state has not yet reached the desired state. + Specifically, the replicas field in the RayCluster CR does not match the actual + number of Pods. + """ + # Check the assumptions of the test + small_group = "small-group" + num_pods = 0 + pod_to_delete = None + for pod in self.mock_client._pod_list["items"]: + if pod["metadata"]["labels"]["ray.io/group"] == small_group: + num_pods += 1 + pod_to_delete = pod["metadata"]["name"] + assert pod_to_delete is not None + + assert ( + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0]["groupName"] + == small_group + ) + desired_replicas = num_pods + 1 + self.mock_client._ray_cluster["spec"]["workerGroupSpecs"][0][ + "replicas" + ] = desired_replicas + + # Terminate a node. The replicas field should be decremented by 1, even though + # the cluster state has not yet reached the goal state. + self.provider.terminate(ids=[pod_to_delete], request_id="term-1") + patches = self.mock_client.get_patches( + f"rayclusters/{self.provider._cluster_name}" + ) + assert len(patches) == 2 + assert patches == [ + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/replicas", + "value": desired_replicas - 1, + }, + { + "op": "replace", + "path": "/spec/workerGroupSpecs/0/scaleStrategy", + "value": { + "workersToDelete": [ + pod_to_delete, + ] + }, + }, + ] + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index 6ae9ab7fc4fe4..12b5c239f4a6a 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -82,7 +82,7 @@ def _get_basic_autoscaling_config() -> dict: }, "small-group": { "max_workers": 300, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1, @@ -95,7 +95,7 @@ def _get_basic_autoscaling_config() -> dict: # and modified max_workers. "gpu-group": { "max_workers": 200, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1, @@ -109,7 +109,7 @@ def _get_basic_autoscaling_config() -> dict: # and modified max_workers and node_config. "tpu-group": { "max_workers": 4, - "min_workers": 1, + "min_workers": 0, "node_config": {}, "resources": { "CPU": 1,