From 914f5b53c452746a345b57913122a9ceac37e7a0 Mon Sep 17 00:00:00 2001 From: Mimi Liao Date: Sun, 1 Dec 2024 00:54:59 -0600 Subject: [PATCH] make node_idle_duration_s a required field and modify testing code based on that Signed-off-by: Mimi Liao --- .../ray/autoscaler/_private/load_metrics.py | 8 +- python/ray/autoscaler/_private/monitor.py | 8 +- python/ray/tests/test_autoscaler.py | 217 ++++++++++-------- .../tests/test_resource_demand_scheduler.py | 33 ++- 4 files changed, 152 insertions(+), 114 deletions(-) diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 930229aa634ff..07192084d89b4 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -2,7 +2,7 @@ import time from collections import Counter from functools import reduce -from typing import Dict, List, Optional +from typing import Dict, List from ray._private.gcs_utils import PlacementGroupTableData from ray.autoscaler._private.constants import ( @@ -93,11 +93,11 @@ def update( raylet_id: bytes, static_resources: Dict[str, Dict], dynamic_resources: Dict[str, Dict], + node_idle_duration_s: float, waiting_bundles: List[Dict[str, float]] = None, infeasible_bundles: List[Dict[str, float]] = None, pending_placement_groups: List[PlacementGroupTableData] = None, cluster_full_of_actors_detected: bool = False, - node_last_used_time_s: Optional[float] = None, ): self.static_resources_by_ip[ip] = static_resources self.raylet_id_by_ip[ip] = raylet_id @@ -121,9 +121,7 @@ def update( self.dynamic_resources_by_ip[ip] = dynamic_resources_update now = time.time() - self.ray_nodes_last_used_time_by_ip[ip] = ( - node_last_used_time_s if node_last_used_time_s else now - ) + self.ray_nodes_last_used_time_by_ip[ip] = now - node_idle_duration_s self.last_heartbeat_time_by_ip[ip] = now self.waiting_bundles = waiting_bundles self.infeasible_bundles = infeasible_bundles diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index c978b8e596012..ebede2890a6f1 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -320,9 +320,9 @@ def update_load_metrics(self): else: ip = resource_message.node_manager_address - idle_duration_ms = 0.0 + idle_duration_s = 0.0 if node_id in ray_nodes_idle_duration_ms_by_id: - idle_duration_ms = ray_nodes_idle_duration_ms_by_id[node_id] + idle_duration_s = ray_nodes_idle_duration_ms_by_id[node_id] / 1000 else: logger.warning( f"node_id {node_id} not found in ray_nodes_idle_duration_ms_by_id" @@ -333,13 +333,11 @@ def update_load_metrics(self): node_id, total_resources, available_resources, + idle_duration_s, waiting_bundles, infeasible_bundles, pending_placement_groups, cluster_full, - time.time() - - idle_duration_ms - / 1000, # node_last_used_time_s = now - idle_duration ) if self.readonly_config: self.readonly_config["available_node_types"].update(mirror_node_types) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 7870d804ead5b..7fee0abc91570 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -320,6 +320,8 @@ def update_nodes(self): SMALL_CLUSTER, **{"available_node_types": TYPES_A, "head_node_type": "empty_node"} ) +DUMMY_IDLE_DURATION_S = 3 + exc_info = None try: raise Exception("Test exception.") @@ -331,7 +333,7 @@ def update_nodes(self): class LoadMetricsTest(unittest.TestCase): def testHeartbeat(self): lm = LoadMetrics() - lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 1}) + lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, 0) lm.mark_active("2.2.2.2") assert "1.1.1.1" in lm.last_heartbeat_time_by_ip assert "2.2.2.2" in lm.last_heartbeat_time_by_ip @@ -339,9 +341,9 @@ def testHeartbeat(self): def testDebugString(self): lm = LoadMetrics() - lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 0}) + lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, 0) lm.update( - "2.2.2.2", mock_raylet_id(), {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2} + "2.2.2.2", mock_raylet_id(), {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, 0 ) lm.update( "3.3.3.3", @@ -354,6 +356,7 @@ def testDebugString(self): "memory": 0, "object_store_memory": 1.05 * 1024 * 1024 * 1024, }, + 0, ) debug = lm.info_string() assert ( @@ -1556,7 +1559,7 @@ def _helperDynamicScaling( }, 1, ) - lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}) + lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0) autoscaler = MockAutoscaler( config_path, lm, @@ -1619,7 +1622,9 @@ def _helperDynamicScaling( worker_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] - lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}) + lm.update( + worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, DUMMY_IDLE_DURATION_S + ) autoscaler.update() # TODO(rickyx): This is a hack to avoid running into race conditions @@ -1647,90 +1652,91 @@ def _helperDynamicScaling( # def testAggressiveAutoscalingWithForegroundLauncher(self): # self._aggressiveAutoscalingHelper(foreground_node_launcher=True) - def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False): - config = copy.deepcopy(SMALL_CLUSTER) - config["available_node_types"]["worker"]["min_workers"] = 0 - config["available_node_types"]["worker"]["max_workers"] = 10 - config["max_workers"] = 10 - config["idle_timeout_minutes"] = 0 - config["upscaling_speed"] = config["available_node_types"]["worker"][ - "max_workers" - ] - if foreground_node_launcher: - config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True - config_path = self.write_config(config) - - self.provider = MockProvider() - self.provider.create_node( - {}, - { - TAG_RAY_NODE_KIND: NODE_KIND_HEAD, - TAG_RAY_USER_NODE_TYPE: "head", - }, - 1, - ) - head_ip = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, - )[0] - runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) - lm = LoadMetrics() - - autoscaler = MockAutoscaler( - config_path, - lm, - MockGcsClient(), - max_launch_batch=5, - max_concurrent_launches=5, - max_failures=0, - process_runner=runner, - update_interval_s=0, - ) - - self.waitForNodes(1) - lm.update( - head_ip, - mock_raylet_id(), - {"CPU": 1}, - {"CPU": 0}, - waiting_bundles=[{"CPU": 1}] * 7, - infeasible_bundles=[{"CPU": 1}] * 3, - ) - autoscaler.update() - - if foreground_node_launcher: - # No wait if node launch is blocking and happens in the foreground. - assert self.num_nodes() == 11 - else: - self.waitForNodes(11) - self.worker_node_thread_check(foreground_node_launcher) - - worker_ips = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, - ) - for ip in worker_ips: - # Mark workers inactive. - lm.last_used_time_by_ip[ip] = 0 - # Clear the resource demands. - # Otherwise in "foreground launcher" mode, workers would be deleted - # for being idle and instantly re-created due to resource demand! - lm.update( - head_ip, - mock_raylet_id(), - {}, - {}, - waiting_bundles=[], - infeasible_bundles=[], - ) - autoscaler.update() - self.waitForNodes(1) # only the head node - # Make sure they don't get overwritten. - assert autoscaler.resource_demand_scheduler.node_types["head"]["resources"] == { - "CPU": 1 - } - assert autoscaler.resource_demand_scheduler.node_types["worker"][ - "resources" - ] == {"CPU": 1} + # def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False): + # config = copy.deepcopy(SMALL_CLUSTER) + # config["available_node_types"]["worker"]["min_workers"] = 0 + # config["available_node_types"]["worker"]["max_workers"] = 10 + # config["max_workers"] = 10 + # config["idle_timeout_minutes"] = 0 + # config["upscaling_speed"] = config["available_node_types"]["worker"][ + # "max_workers" + # ] + # if foreground_node_launcher: + # config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True + # config_path = self.write_config(config) + + # self.provider = MockProvider() + # self.provider.create_node( + # {}, + # { + # TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + # TAG_RAY_USER_NODE_TYPE: "head", + # }, + # 1, + # ) + # head_ip = self.provider.non_terminated_node_ips( + # tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, + # )[0] + # runner = MockProcessRunner() + # runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) + # lm = LoadMetrics() + + # autoscaler = MockAutoscaler( + # config_path, + # lm, + # MockGcsClient(), + # max_launch_batch=5, + # max_concurrent_launches=5, + # max_failures=0, + # process_runner=runner, + # update_interval_s=0, + # ) + + # self.waitForNodes(1) + # lm.update( + # head_ip, + # mock_raylet_id(), + # {"CPU": 1}, + # {"CPU": 0}, + # waiting_bundles=[{"CPU": 1}] * 7, + # infeasible_bundles=[{"CPU": 1}] * 3, + # ) + # autoscaler.update() + + # if foreground_node_launcher: + # # No wait if node launch is blocking and happens in the foreground. + # assert self.num_nodes() == 11 + # else: + # self.waitForNodes(11) + # self.worker_node_thread_check(foreground_node_launcher) + + # worker_ips = self.provider.non_terminated_node_ips( + # tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, + # ) + # for ip in worker_ips: + # # Mark workers inactive. + # lm.last_used_time_by_ip[ip] = 0 + # # Clear the resource demands. + # # Otherwise in "foreground launcher" mode, workers would be deleted + # # for being idle and instantly re-created due to resource demand! + # lm.update( + # head_ip, + # mock_raylet_id(), + # {}, + # {}, + # waiting_bundles=[], + # infeasible_bundles=[], + # ) + # autoscaler.update() + # self.waitForNodes(1) # only the head node + # # Make sure they don't get overwritten. + # assert autoscaler.resource_demand_scheduler.node_types["head"]["resources"] + # == { + # "CPU": 1 + # } + # assert autoscaler.resource_demand_scheduler.node_types["worker"][ + # "resources" + # ] == {"CPU": 1} def testUnmanagedNodes(self): config = copy.deepcopy(SMALL_CLUSTER) @@ -1779,8 +1785,14 @@ def testUnmanagedNodes(self): autoscaler.update() self.waitForNodes(2) # This node has num_cpus=0 - lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}) - lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0}) + lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0) + lm.update( + unmanaged_ip, + mock_raylet_id(), + {"CPU": 0}, + {"CPU": 0}, + DUMMY_IDLE_DURATION_S, + ) autoscaler.update() self.waitForNodes(2) # 1 CPU task cannot be scheduled. @@ -1789,6 +1801,7 @@ def testUnmanagedNodes(self): mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"CPU": 1}], ) autoscaler.update() @@ -1821,9 +1834,6 @@ def testUnmanagedNodes2(self): unmanaged_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0] - unmanaged_ip = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, - )[0] runner = MockProcessRunner() @@ -1841,8 +1851,14 @@ def testUnmanagedNodes2(self): update_interval_s=0, ) - lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}) - lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0}) + lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0) + lm.update( + unmanaged_ip, + mock_raylet_id(), + {"CPU": 0}, + {"CPU": 0}, + DUMMY_IDLE_DURATION_S, + ) # Note that we shouldn't autoscale here because the resource demand # vector is not set and target utilization fraction = 1. @@ -1896,6 +1912,7 @@ def testDelayedLaunch(self): mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, + 0, waiting_bundles=[{"CPU": 1}] * 2, ) autoscaler.update() @@ -2096,7 +2113,7 @@ def testIgnoresCorruptedConfig(self): 1, ) lm = LoadMetrics() - lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}) + lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0) mock_metrics = Mock(spec=AutoscalerPrometheusMetrics()) autoscaler = MockAutoscaler( config_path, @@ -2141,7 +2158,9 @@ def testIgnoresCorruptedConfig(self): )[0] # Because one worker already started, the scheduler waits for its # resources to be updated before it launches the remaining min_workers. - lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}) + lm.update( + worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, DUMMY_IDLE_DURATION_S + ) autoscaler.update() self.waitForNodes(10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) assert mock_metrics.drain_node_exceptions.inc.call_count == 0 @@ -2432,7 +2451,7 @@ def testScaleDownMaxWorkers(self): def testFalseyLoadMetrics(self): lm = LoadMetrics() assert not lm - lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}) + lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0) assert lm def testRecoverUnhealthyWorkers(self): diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 34dd81a75cd6c..05cacbb1559f1 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -69,6 +69,8 @@ GET_DEFAULT_METHOD = "ray.autoscaler._private.util._get_default_config" EMPTY_AVAILABILITY_SUMMARY = NodeAvailabilitySummary({}) +DUMMY_IDLE_DURATION_S = 3 + utilization_scorer = partial( _default_utilization_scorer, node_availability_summary=EMPTY_AVAILABILITY_SUMMARY ) @@ -1653,6 +1655,7 @@ def testResourceDemandVector(self): mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, + 0, waiting_bundles=[{"GPU": 1}], infeasible_bundles=[{"CPU": 16}], ) @@ -1677,6 +1680,7 @@ def testPlacementGroupLoad(self): mock_raylet_id(), {}, {}, + DUMMY_IDLE_DURATION_S, pending_placement_groups=pending_placement_groups, ) assert lm.get_pending_placement_groups() == pending_placement_groups @@ -1709,6 +1713,7 @@ def testSummary(self): "memory": 500 * 1024 * 1024, # 500 MiB "object_store_memory": 1000 * 1024 * 1024, }, + 0, ) lm.update( "1.1.1.2", @@ -1723,18 +1728,21 @@ def testSummary(self): "GPU": 1, "accelerator_type:V100": 1, }, + 0, ) lm.update( "1.1.1.3", mock_raylet_id(), {"CPU": 64, "GPU": 8, "accelerator_type:V100": 1}, {"CPU": 0, "GPU": 0, "accelerator_type:V100": 0.92}, + 0, ) lm.update( "1.1.1.4", mock_raylet_id(), {"CPU": 2}, {"CPU": 2}, + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"GPU": 2}] * 10, infeasible_bundles=[{"CPU": 16}, {"GPU": 2}, {"CPU": 16, "GPU": 2}], pending_placement_groups=pending_placement_groups, @@ -1946,9 +1954,9 @@ def testSummary(self): self.waitForNodes(3) for ip in self.provider.non_terminated_node_ips({}): - lm.update(ip, mock_raylet_id(), {"CPU": 2}, {"CPU": 0}) + lm.update(ip, mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, 0) - lm.update(head_ip, mock_raylet_id(), {"CPU": 16}, {"CPU": 1}) + lm.update(head_ip, mock_raylet_id(), {"CPU": 16}, {"CPU": 1}, 0) autoscaler.update() while True: @@ -1970,6 +1978,7 @@ def testSummary(self): mock_raylet_id(), {"CPU": 16}, {"CPU": 1}, + 0, waiting_bundles=[{"GPU": 1}], ) @@ -2152,6 +2161,7 @@ def testPlacementGroup(self): mock_raylet_id(), {"CPU": 16}, {"CPU": 16}, + DUMMY_IDLE_DURATION_S, infeasible_bundles=placement_group_resource_demands, waiting_bundles=[{"GPU": 8}], pending_placement_groups=pending_placement_groups, @@ -2234,7 +2244,7 @@ def testScaleUpMinWorkers(self): # Make sure that after idle_timeout_minutes we don't kill idle # min workers. for node_id in self.provider.non_terminated_nodes({}): - lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60 + lm.ray_nodes_last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60 fill_in_raylet_ids(self.provider, lm) autoscaler.update() self.waitForNodes(3) @@ -2285,7 +2295,7 @@ def testScaleUpIgnoreUsed(self): ) autoscaler.update() self.waitForNodes(1) - lm.update(head_ip, mock_raylet_id(), {"CPU": 4, "GPU": 1}, {}) + lm.update(head_ip, mock_raylet_id(), {"CPU": 4, "GPU": 1}, {}, 0) self.waitForNodes(1) lm.update( @@ -2293,6 +2303,7 @@ def testScaleUpIgnoreUsed(self): mock_raylet_id(), {"CPU": 4, "GPU": 1}, {"GPU": 0}, + 0, waiting_bundles=[{"GPU": 1}], ) autoscaler.update() @@ -2473,6 +2484,7 @@ def testScaleUpLoadMetrics(self): mock_raylet_id(), {}, {}, + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"GPU": 1}], infeasible_bundles=[{"CPU": 16}], ) @@ -2510,7 +2522,7 @@ def testCommandPassing(self): 1, ) lm = LoadMetrics() - lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 0}, {"CPU": 0}) + lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 0}, {"CPU": 0}, 0) autoscaler = MockAutoscaler( config_path, lm, @@ -2787,6 +2799,7 @@ def testRequestResourcesIdleTimeout(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}], ) autoscaler.update() @@ -2801,6 +2814,7 @@ def testRequestResourcesIdleTimeout(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], {}, + 0, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}], ) autoscaler.update() @@ -2810,6 +2824,7 @@ def testRequestResourcesIdleTimeout(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}], ) autoscaler.update() @@ -2823,6 +2838,7 @@ def testRequestResourcesIdleTimeout(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}], ) autoscaler.update() @@ -2891,6 +2907,7 @@ def testRequestResourcesRaceConditionsLong(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}], ) autoscaler.load_metrics.set_resource_requests([{"CPU": 0.2, "WORKER": 1.0}] * 2) @@ -2908,6 +2925,7 @@ def testRequestResourcesRaceConditionsLong(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], {}, + 0, waiting_bundles=[{"CPU": 0.2, "WORKER": 1.0}] * 3, ) autoscaler.update() @@ -2919,18 +2937,21 @@ def testRequestResourcesRaceConditionsLong(self): mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, ) lm.update( "172.0.0.3", mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], + DUMMY_IDLE_DURATION_S, ) lm.update( node_ip, mock_raylet_id(), config["available_node_types"]["def_worker"]["resources"], {}, + 0, ) print("============ Should scale down from here =============", node_id) autoscaler.update() @@ -3037,6 +3058,7 @@ def testRequestResourcesRaceConditionWithResourceDemands(self): mock_raylet_id(), {"CPU": 2, "GPU": 1}, {"CPU": 2}, + 0, waiting_bundles=[{"CPU": 2}], ) autoscaler.load_metrics.set_resource_requests([{"CPU": 2, "GPU": 1}] * 2) @@ -3048,6 +3070,7 @@ def testRequestResourcesRaceConditionWithResourceDemands(self): mock_raylet_id(), {"CPU": 2, "GPU": 1}, {"CPU": 2}, + 0, waiting_bundles=[{"CPU": 2}], ) # make sure it stays consistent.