Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][autoscaler] Fix incorrectly terminating nodes misclassified as idle in autoscaler v1 #48519

Merged
merged 11 commits into from
Dec 3, 2024
Merged
10 changes: 7 additions & 3 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,11 @@ def terminate_nodes_to_enforce_config_constraints(self, now: float):
assert self.non_terminated_nodes
assert self.provider

last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])
last_used = self.load_metrics.ray_nodes_last_used_time_by_ip

idle_timeout_s = 60 * self.config["idle_timeout_minutes"]

last_used_cutoff = now - idle_timeout_s

# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
Expand Down Expand Up @@ -539,7 +542,8 @@ def keep_node(node_id: NodeID) -> None:
continue

node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:

if node_ip in last_used and last_used[node_ip] < last_used_cutoff:
self.schedule_node_termination(node_id, "idle", logger.info)
# Get the local time of the node's last use as a string.
formatted_last_used_time = time.asctime(
Expand Down
13 changes: 5 additions & 8 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class LoadMetrics:
"""

def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
Expand All @@ -80,6 +79,7 @@ def __init__(self):
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_of_actors_detected = False
self.ray_nodes_last_used_time_by_ip = {}

def __bool__(self):
"""A load metrics instance is Falsey iff the autoscaler process
Expand All @@ -93,6 +93,7 @@ def update(
raylet_id: bytes,
static_resources: Dict[str, Dict],
dynamic_resources: Dict[str, Dict],
node_idle_duration_s: float,
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
Expand Down Expand Up @@ -120,11 +121,7 @@ def update(
self.dynamic_resources_by_ip[ip] = dynamic_resources_update

now = time.time()
if (
ip not in self.last_used_time_by_ip
or self.static_resources_by_ip[ip] != self.dynamic_resources_by_ip[ip]
):
self.last_used_time_by_ip[ip] = now
self.ray_nodes_last_used_time_by_ip[ip] = now - node_idle_duration_s
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
Expand Down Expand Up @@ -167,7 +164,7 @@ def prune(mapping, should_log):
)
assert not (unwanted_ips & set(mapping))

prune(self.last_used_time_by_ip, should_log=True)
prune(self.ray_nodes_last_used_time_by_ip, should_log=True)
prune(self.static_resources_by_ip, should_log=False)
prune(self.raylet_id_by_ip, should_log=False)
prune(self.dynamic_resources_by_ip, should_log=False)
Expand Down Expand Up @@ -337,7 +334,7 @@ def _info(self):
resources_used, resources_total = self._get_resource_usage()

now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
idle_times = [now - t for t in self.ray_nodes_last_used_time_by_ip.values()]
heartbeat_times = [now - t for t in self.last_heartbeat_time_by_ip.values()]
most_delayed_heartbeats = sorted(
self.last_heartbeat_time_by_ip.items(), key=lambda pair: pair[1]
Expand Down
20 changes: 20 additions & 0 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import format_readonly_node_type
from ray.autoscaler.v2.sdk import get_cluster_resource_state
from ray.core.generated import gcs_pb2
from ray.core.generated.event_pb2 import Event as RayEvent
from ray.experimental.internal_kv import (
Expand Down Expand Up @@ -245,6 +246,15 @@ def update_load_metrics(self):
resources_batch_data = response.resource_usage_data
log_resource_batch_data_if_desired(resources_batch_data)

# This is a workaround to get correct idle_duration_ms
# from "get_cluster_resource_state"
# ref: https://github.com/ray-project/ray/pull/48519#issuecomment-2481659346
cluster_resource_state = get_cluster_resource_state(self.gcs_client)
ray_node_states = cluster_resource_state.node_states
ray_nodes_idle_duration_ms_by_id = {
node.node_id: node.idle_duration_ms for node in ray_node_states
}

# Tell the readonly node provider what nodes to report.
if self.readonly_config:
new_nodes = []
Expand Down Expand Up @@ -309,11 +319,21 @@ def update_load_metrics(self):
ip = node_id.hex()
else:
ip = resource_message.node_manager_address

idle_duration_s = 0.0
if node_id in ray_nodes_idle_duration_ms_by_id:
idle_duration_s = ray_nodes_idle_duration_ms_by_id[node_id] / 1000
else:
logger.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases will this condition occur?

Copy link
Contributor Author

@mimiliaogo mimiliaogo Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, get_all_resource_usage and get_cluster_resource_state should return the same set of nodes. However, since they are implemented in Autoscaler v1 and v2 respectively, I'm uncertain if there may be any discrepancies between them in some special cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to understand how severe the inconsistency is to determine whether this warrants a warning or a panic. @rickyyx Would you mind providing some insights?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both seem to be getting data from GCS, which should be mostly consistent I believe, but yeah, the codepaths that generate the data is different -> i think graceful handling with warnings is fine (they should eventually be consistent)

BTW, is there any reason we don't use mainly the v2's info from get_cluster_resource_state here entirely? I think the only v1 bit of info that's missing in v2 is "cluster_full", which we could probably also add to v2.

I don't have a strong opinion between the below given the source of info is both GCS, and the likelihood of inconsistency is low IMO:

  1. use mostly v1's info, and patch with v2's idle info
  2. use mostly v2's info, and patch with v1's cluster_full or other missing ones in v2.

I think merging this PR to fix the idle issue is fine - and we could follow up to bring v2's RPC in parity with V1 so we could use V2 entirely here. Or if we are pushing V2 really hard, we might just deprecate V1 in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use mostly v1's info, and patch with v2's idle info
  2. use mostly v2's info, and patch with v1's cluster_full or other missing ones in v2.

How about we go with option 1 so that we can reduce the dependency between V1 and V2?

f"node_id {node_id} not found in ray_nodes_idle_duration_ms_by_id"
)

self.load_metrics.update(
ip,
node_id,
total_resources,
available_resources,
idle_duration_s,
waiting_bundles,
infeasible_bundles,
pending_placement_groups,
Expand Down
Loading
Loading