Skip to content

Commit

Permalink
make node_idle_duration_s a required field and modify testing code ba…
Browse files Browse the repository at this point in the history
…sed on that

Signed-off-by: Mimi Liao <[email protected]>
  • Loading branch information
mimiliaogo committed Dec 1, 2024
1 parent 898dc51 commit 914f5b5
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 114 deletions.
8 changes: 3 additions & 5 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections import Counter
from functools import reduce
from typing import Dict, List, Optional
from typing import Dict, List

from ray._private.gcs_utils import PlacementGroupTableData
from ray.autoscaler._private.constants import (
Expand Down Expand Up @@ -93,11 +93,11 @@ def update(
raylet_id: bytes,
static_resources: Dict[str, Dict],
dynamic_resources: Dict[str, Dict],
node_idle_duration_s: float,
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False,
node_last_used_time_s: Optional[float] = None,
):
self.static_resources_by_ip[ip] = static_resources
self.raylet_id_by_ip[ip] = raylet_id
Expand All @@ -121,9 +121,7 @@ def update(
self.dynamic_resources_by_ip[ip] = dynamic_resources_update

now = time.time()
self.ray_nodes_last_used_time_by_ip[ip] = (
node_last_used_time_s if node_last_used_time_s else now
)
self.ray_nodes_last_used_time_by_ip[ip] = now - node_idle_duration_s
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
Expand Down
8 changes: 3 additions & 5 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ def update_load_metrics(self):
else:
ip = resource_message.node_manager_address

idle_duration_ms = 0.0
idle_duration_s = 0.0
if node_id in ray_nodes_idle_duration_ms_by_id:
idle_duration_ms = ray_nodes_idle_duration_ms_by_id[node_id]
idle_duration_s = ray_nodes_idle_duration_ms_by_id[node_id] / 1000
else:
logger.warning(
f"node_id {node_id} not found in ray_nodes_idle_duration_ms_by_id"
Expand All @@ -333,13 +333,11 @@ def update_load_metrics(self):
node_id,
total_resources,
available_resources,
idle_duration_s,
waiting_bundles,
infeasible_bundles,
pending_placement_groups,
cluster_full,
time.time()
- idle_duration_ms
/ 1000, # node_last_used_time_s = now - idle_duration
)
if self.readonly_config:
self.readonly_config["available_node_types"].update(mirror_node_types)
Expand Down
217 changes: 118 additions & 99 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ def update_nodes(self):
SMALL_CLUSTER, **{"available_node_types": TYPES_A, "head_node_type": "empty_node"}
)

DUMMY_IDLE_DURATION_S = 3

exc_info = None
try:
raise Exception("Test exception.")
Expand All @@ -331,17 +333,17 @@ def update_nodes(self):
class LoadMetricsTest(unittest.TestCase):
def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 1}, 0)
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
assert "3.3.3.3" not in lm.last_heartbeat_time_by_ip

def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 0})
lm.update("1.1.1.1", mock_raylet_id(), {"CPU": 2}, {"CPU": 0}, 0)
lm.update(
"2.2.2.2", mock_raylet_id(), {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}
"2.2.2.2", mock_raylet_id(), {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, 0
)
lm.update(
"3.3.3.3",
Expand All @@ -354,6 +356,7 @@ def testDebugString(self):
"memory": 0,
"object_store_memory": 1.05 * 1024 * 1024 * 1024,
},
0,
)
debug = lm.info_string()
assert (
Expand Down Expand Up @@ -1556,7 +1559,7 @@ def _helperDynamicScaling(
},
1,
)
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0)
autoscaler = MockAutoscaler(
config_path,
lm,
Expand Down Expand Up @@ -1619,7 +1622,9 @@ def _helperDynamicScaling(
worker_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER},
)[0]
lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1})
lm.update(
worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, DUMMY_IDLE_DURATION_S
)

autoscaler.update()
# TODO(rickyx): This is a hack to avoid running into race conditions
Expand Down Expand Up @@ -1647,90 +1652,91 @@ def _helperDynamicScaling(
# def testAggressiveAutoscalingWithForegroundLauncher(self):
# self._aggressiveAutoscalingHelper(foreground_node_launcher=True)

def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False):
config = copy.deepcopy(SMALL_CLUSTER)
config["available_node_types"]["worker"]["min_workers"] = 0
config["available_node_types"]["worker"]["max_workers"] = 10
config["max_workers"] = 10
config["idle_timeout_minutes"] = 0
config["upscaling_speed"] = config["available_node_types"]["worker"][
"max_workers"
]
if foreground_node_launcher:
config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True
config_path = self.write_config(config)

self.provider = MockProvider()
self.provider.create_node(
{},
{
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "head",
},
1,
)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD},
)[0]
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
lm = LoadMetrics()

autoscaler = MockAutoscaler(
config_path,
lm,
MockGcsClient(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
process_runner=runner,
update_interval_s=0,
)

self.waitForNodes(1)
lm.update(
head_ip,
mock_raylet_id(),
{"CPU": 1},
{"CPU": 0},
waiting_bundles=[{"CPU": 1}] * 7,
infeasible_bundles=[{"CPU": 1}] * 3,
)
autoscaler.update()

if foreground_node_launcher:
# No wait if node launch is blocking and happens in the foreground.
assert self.num_nodes() == 11
else:
self.waitForNodes(11)
self.worker_node_thread_check(foreground_node_launcher)

worker_ips = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER},
)
for ip in worker_ips:
# Mark workers inactive.
lm.last_used_time_by_ip[ip] = 0
# Clear the resource demands.
# Otherwise in "foreground launcher" mode, workers would be deleted
# for being idle and instantly re-created due to resource demand!
lm.update(
head_ip,
mock_raylet_id(),
{},
{},
waiting_bundles=[],
infeasible_bundles=[],
)
autoscaler.update()
self.waitForNodes(1) # only the head node
# Make sure they don't get overwritten.
assert autoscaler.resource_demand_scheduler.node_types["head"]["resources"] == {
"CPU": 1
}
assert autoscaler.resource_demand_scheduler.node_types["worker"][
"resources"
] == {"CPU": 1}
# def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False):
# config = copy.deepcopy(SMALL_CLUSTER)
# config["available_node_types"]["worker"]["min_workers"] = 0
# config["available_node_types"]["worker"]["max_workers"] = 10
# config["max_workers"] = 10
# config["idle_timeout_minutes"] = 0
# config["upscaling_speed"] = config["available_node_types"]["worker"][
# "max_workers"
# ]
# if foreground_node_launcher:
# config["provider"][FOREGROUND_NODE_LAUNCH_KEY] = True
# config_path = self.write_config(config)

# self.provider = MockProvider()
# self.provider.create_node(
# {},
# {
# TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
# TAG_RAY_USER_NODE_TYPE: "head",
# },
# 1,
# )
# head_ip = self.provider.non_terminated_node_ips(
# tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD},
# )[0]
# runner = MockProcessRunner()
# runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
# lm = LoadMetrics()

# autoscaler = MockAutoscaler(
# config_path,
# lm,
# MockGcsClient(),
# max_launch_batch=5,
# max_concurrent_launches=5,
# max_failures=0,
# process_runner=runner,
# update_interval_s=0,
# )

# self.waitForNodes(1)
# lm.update(
# head_ip,
# mock_raylet_id(),
# {"CPU": 1},
# {"CPU": 0},
# waiting_bundles=[{"CPU": 1}] * 7,
# infeasible_bundles=[{"CPU": 1}] * 3,
# )
# autoscaler.update()

# if foreground_node_launcher:
# # No wait if node launch is blocking and happens in the foreground.
# assert self.num_nodes() == 11
# else:
# self.waitForNodes(11)
# self.worker_node_thread_check(foreground_node_launcher)

# worker_ips = self.provider.non_terminated_node_ips(
# tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER},
# )
# for ip in worker_ips:
# # Mark workers inactive.
# lm.last_used_time_by_ip[ip] = 0
# # Clear the resource demands.
# # Otherwise in "foreground launcher" mode, workers would be deleted
# # for being idle and instantly re-created due to resource demand!
# lm.update(
# head_ip,
# mock_raylet_id(),
# {},
# {},
# waiting_bundles=[],
# infeasible_bundles=[],
# )
# autoscaler.update()
# self.waitForNodes(1) # only the head node
# # Make sure they don't get overwritten.
# assert autoscaler.resource_demand_scheduler.node_types["head"]["resources"]
# == {
# "CPU": 1
# }
# assert autoscaler.resource_demand_scheduler.node_types["worker"][
# "resources"
# ] == {"CPU": 1}

def testUnmanagedNodes(self):
config = copy.deepcopy(SMALL_CLUSTER)
Expand Down Expand Up @@ -1779,8 +1785,14 @@ def testUnmanagedNodes(self):
autoscaler.update()
self.waitForNodes(2)
# This node has num_cpus=0
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0})
lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0})
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0)
lm.update(
unmanaged_ip,
mock_raylet_id(),
{"CPU": 0},
{"CPU": 0},
DUMMY_IDLE_DURATION_S,
)
autoscaler.update()
self.waitForNodes(2)
# 1 CPU task cannot be scheduled.
Expand All @@ -1789,6 +1801,7 @@ def testUnmanagedNodes(self):
mock_raylet_id(),
{"CPU": 0},
{"CPU": 0},
DUMMY_IDLE_DURATION_S,
waiting_bundles=[{"CPU": 1}],
)
autoscaler.update()
Expand Down Expand Up @@ -1821,9 +1834,6 @@ def testUnmanagedNodes2(self):
unmanaged_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "unmanaged"},
)[0]
unmanaged_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "unmanaged"},
)[0]

runner = MockProcessRunner()

Expand All @@ -1841,8 +1851,14 @@ def testUnmanagedNodes2(self):
update_interval_s=0,
)

lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0})
lm.update(unmanaged_ip, mock_raylet_id(), {"CPU": 0}, {"CPU": 0})
lm.update(head_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0)
lm.update(
unmanaged_ip,
mock_raylet_id(),
{"CPU": 0},
{"CPU": 0},
DUMMY_IDLE_DURATION_S,
)

# Note that we shouldn't autoscale here because the resource demand
# vector is not set and target utilization fraction = 1.
Expand Down Expand Up @@ -1896,6 +1912,7 @@ def testDelayedLaunch(self):
mock_raylet_id(),
{"CPU": 1},
{"CPU": 0},
0,
waiting_bundles=[{"CPU": 1}] * 2,
)
autoscaler.update()
Expand Down Expand Up @@ -2096,7 +2113,7 @@ def testIgnoresCorruptedConfig(self):
1,
)
lm = LoadMetrics()
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0)
mock_metrics = Mock(spec=AutoscalerPrometheusMetrics())
autoscaler = MockAutoscaler(
config_path,
Expand Down Expand Up @@ -2141,7 +2158,9 @@ def testIgnoresCorruptedConfig(self):
)[0]
# Because one worker already started, the scheduler waits for its
# resources to be updated before it launches the remaining min_workers.
lm.update(worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1})
lm.update(
worker_ip, mock_raylet_id(), {"CPU": 1}, {"CPU": 1}, DUMMY_IDLE_DURATION_S
)
autoscaler.update()
self.waitForNodes(10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert mock_metrics.drain_node_exceptions.inc.call_count == 0
Expand Down Expand Up @@ -2432,7 +2451,7 @@ def testScaleDownMaxWorkers(self):
def testFalseyLoadMetrics(self):
lm = LoadMetrics()
assert not lm
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0})
lm.update("172.0.0.0", mock_raylet_id(), {"CPU": 1}, {"CPU": 0}, 0)
assert lm

def testRecoverUnhealthyWorkers(self):
Expand Down
Loading

0 comments on commit 914f5b5

Please sign in to comment.