Skip to content

Commit

Permalink
use GetClusterResourceState to get correct idle time
Browse files Browse the repository at this point in the history
Signed-off-by: Mimi Liao <[email protected]>
  • Loading branch information
mimiliaogo committed Nov 18, 2024
1 parent cae84c6 commit f663683
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 48 deletions.
34 changes: 9 additions & 25 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,22 +490,13 @@ def terminate_nodes_to_enforce_config_constraints(self, now: float):
assert self.non_terminated_nodes
assert self.provider

last_used = self.load_metrics.last_used_time_by_ip

# local import to avoid circular dependencies
from ray.autoscaler.v2.sdk import get_cluster_resource_state

# Note: The `last_used` metric only considers resource occupation,
# which can misreport nodes as idle when:
# 1. Tasks without assigned resources run on the node.
# 2. All tasks are blocked on `get` or `wait` operations.
# Using idle_duration_ms reported by ralyet instead
# ref: https://github.com/ray-project/ray/pull/39582
# Use get_cluster_resource_state from autocaler v2 sdk
# to get idle_duration_ms from raylet
ray_state = get_cluster_resource_state(self.gcs_client)
ray_nodes_idle_duration_ms_by_id = {
node.node_id: node.idle_duration_ms for node in ray_state.node_states
ray_nodes_idle_duration_ms_by_ip = (
self.load_metrics.ray_nodes_idle_duration_ms_by_ip
)
now = time.time()
last_used = {
ip: now - duration
for ip, duration in ray_nodes_idle_duration_ms_by_ip.items()
}

idle_timeout_ms = 60 * 1000 * self.config["idle_timeout_minutes"]
Expand Down Expand Up @@ -557,16 +548,9 @@ def keep_node(node_id: NodeID) -> None:

node_ip = self.provider.internal_ip(node_id)

# Only attempt to drain connected nodes, i.e. nodes with ips in
# LoadMetrics.
internal_node_id = None
if node_ip in self.load_metrics.raylet_id_by_ip:
internal_node_id = self.load_metrics.raylet_id_by_ip[node_ip]

if (
internal_node_id
and internal_node_id in ray_nodes_idle_duration_ms_by_id
and ray_nodes_idle_duration_ms_by_id[internal_node_id] > idle_timeout_ms
node_ip in ray_nodes_idle_duration_ms_by_ip
and ray_nodes_idle_duration_ms_by_ip[node_ip] > idle_timeout_ms
):
self.schedule_node_termination(node_id, "idle", logger.info)
# Get the local time of the node's last use as a string.
Expand Down
13 changes: 5 additions & 8 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class LoadMetrics:
"""

def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
Expand All @@ -80,6 +79,7 @@ def __init__(self):
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_of_actors_detected = False
self.ray_nodes_idle_duration_ms_by_ip = {}

def __bool__(self):
"""A load metrics instance is Falsey iff the autoscaler process
Expand All @@ -97,10 +97,12 @@ def update(
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False,
idle_duration_ms: int = 0,
):
self.static_resources_by_ip[ip] = static_resources
self.raylet_id_by_ip[ip] = raylet_id
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected
self.ray_nodes_idle_duration_ms_by_ip[ip] = idle_duration_ms

if not waiting_bundles:
waiting_bundles = []
Expand All @@ -120,11 +122,6 @@ def update(
self.dynamic_resources_by_ip[ip] = dynamic_resources_update

now = time.time()
if (
ip not in self.last_used_time_by_ip
or self.static_resources_by_ip[ip] != self.dynamic_resources_by_ip[ip]
):
self.last_used_time_by_ip[ip] = now
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
Expand Down Expand Up @@ -167,7 +164,7 @@ def prune(mapping, should_log):
)
assert not (unwanted_ips & set(mapping))

prune(self.last_used_time_by_ip, should_log=True)
prune(self.ray_nodes_idle_duration_ms_by_ip, should_log=True)
prune(self.static_resources_by_ip, should_log=False)
prune(self.raylet_id_by_ip, should_log=False)
prune(self.dynamic_resources_by_ip, should_log=False)
Expand Down Expand Up @@ -337,7 +334,7 @@ def _info(self):
resources_used, resources_total = self._get_resource_usage()

now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
idle_times = self.ray_nodes_idle_duration_ms_by_ip.values()
heartbeat_times = [now - t for t in self.last_heartbeat_time_by_ip.values()]
most_delayed_heartbeats = sorted(
self.last_heartbeat_time_by_ip.items(), key=lambda pair: pair[1]
Expand Down
24 changes: 24 additions & 0 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import format_readonly_node_type
from ray.core.generated import gcs_pb2
from ray.core.generated.autoscaler_pb2 import GetClusterResourceStateReply
from ray.core.generated.event_pb2 import Event as RayEvent
from ray.experimental.internal_kv import (
_initialize_internal_kv,
Expand Down Expand Up @@ -238,13 +239,32 @@ def get_latest_readonly_config():
prom_metrics=self.prom_metrics,
)

def get_cluster_resource_state(self):
"""
Get the cluster resource state from GCS.
"""
str_reply = self.gcs_client.get_cluster_resource_state()
reply = GetClusterResourceStateReply()
reply.ParseFromString(str_reply)
return reply.cluster_resource_state

def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""

response = self.gcs_client.get_all_resource_usage(timeout=60)
resources_batch_data = response.resource_usage_data
log_resource_batch_data_if_desired(resources_batch_data)

# This is a workaround to get correct idle_duration_ms
# from "get_cluster_resource_state"
# reason: https://github.com/ray-project/ray/pull/48519#issuecomment-2481659346
# TODO(mimi): use idle_duration_ms from "get_all_resource_usage"
cluster_resource_state = self.get_cluster_resource_state()
ray_node_states = cluster_resource_state.node_states
ray_nodes_idle_duration_ms_by_id = {
node.node_id: node.idle_duration_ms for node in ray_node_states
}

# Tell the readonly node provider what nodes to report.
if self.readonly_config:
new_nodes = []
Expand Down Expand Up @@ -309,6 +329,9 @@ def update_load_metrics(self):
ip = node_id.hex()
else:
ip = resource_message.node_manager_address

idle_duration_ms = ray_nodes_idle_duration_ms_by_id[node_id]

self.load_metrics.update(
ip,
node_id,
Expand All @@ -318,6 +341,7 @@ def update_load_metrics(self):
infeasible_bundles,
pending_placement_groups,
cluster_full,
idle_duration_ms,
)
if self.readonly_config:
self.readonly_config["available_node_types"].update(mirror_node_types)
Expand Down
15 changes: 0 additions & 15 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
)
from ray.exceptions import RpcError

from ray.core.generated import autoscaler_pb2

WORKER_FILTER = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER}


Expand Down Expand Up @@ -111,8 +109,6 @@ def __init__(self, drain_node_outcome=DrainNodeOutcome.Succeeded):
# Tracks how many times DrainNode returned a successful RPC response.
self.drain_node_reply_success = 0

self.custom_cluster_state = None

def drain_nodes(self, raylet_ids_to_drain, timeout: int):
"""Simulate NodeInfo stub's DrainNode call.
Expand Down Expand Up @@ -146,17 +142,6 @@ def drain_nodes(self, raylet_ids_to_drain, timeout: int):
# Shouldn't land here.
assert False, "Possible drain node outcomes exhausted."

def get_cluster_resource_state(self):
"""Mock get_cluster_resource_state to return a ClusterResourceState.
Returns an empty state by default or custom state if provided.
"""
if self.custom_cluster_state is not None:
return self.custom_cluster_state.SerializeToString()

# Default empty ClusterResourceState
return autoscaler_pb2.ClusterResourceState().SerializeToString()


def mock_raylet_id() -> bytes:
"""Random raylet id to pass to load_metrics.update."""
Expand Down

0 comments on commit f663683

Please sign in to comment.