From f12b88b055c782dc4c59086b3430a970a9a6acf3 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 21 Jan 2025 14:58:42 -0600 Subject: [PATCH] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/replica.py | 30 +++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index cca5969d890c7..089d92fa03f2d 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -135,6 +135,9 @@ def __init__( SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE ) self._num_ongoing_requests = 0 + + # If the interval is set to 0, eagerly sets all metrics. + self._cached_metrics_enabled = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS != 0 self._cached_metrics_interval_s = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS / 1000 # Request counter (only set on replica startup). @@ -154,7 +157,7 @@ def __init__( ), tag_keys=("route",), ) - if self._cached_metrics_interval_s: + if self._cached_metrics_enabled: self._cached_request_counter = defaultdict(int) self._error_counter = metrics.Counter( @@ -164,7 +167,7 @@ def __init__( ), tag_keys=("route",), ) - if self._cached_metrics_interval_s: + if self._cached_metrics_enabled: self._cached_error_counter = defaultdict(int) self._processing_latency_tracker = metrics.Histogram( @@ -173,7 +176,7 @@ def __init__( boundaries=DEFAULT_LATENCY_BUCKET_MS, tag_keys=("route",), ) - if self._cached_metrics_interval_s: + if self._cached_metrics_enabled: self._cached_latencies = defaultdict(deque) self._num_ongoing_requests_gauge = metrics.Gauge( @@ -183,7 +186,7 @@ def __init__( self.set_autoscaling_config(autoscaling_config) - if self._cached_metrics_interval_s: + if self._cached_metrics_enabled: event_loop.create_task(self._report_cached_metrics_forever()) def _report_cached_metrics(self): @@ -218,7 +221,6 @@ async def _report_cached_metrics_forever(self): # Exponential backoff starting at 1s and capping at 10s. backoff_time_s = min(10, 2**consecutive_errors) - print(backoff_time_s) consecutive_errors += 1 await asyncio.sleep(backoff_time_s) @@ -257,13 +259,13 @@ def set_autoscaling_config(self, autoscaling_config: Optional[AutoscalingConfig] def inc_num_ongoing_requests(self) -> int: """Increment the current total queue length of requests for this replica.""" self._num_ongoing_requests += 1 - if self._cached_metrics_interval_s == 0: + if not self._cached_metrics_enabled: self._num_ongoing_requests_gauge.set(self._num_ongoing_requests) def dec_num_ongoing_requests(self) -> int: """Decrement the current total queue length of requests for this replica.""" self._num_ongoing_requests -= 1 - if self._cached_metrics_interval_s == 0: + if not self._cached_metrics_enabled: self._num_ongoing_requests_gauge.set(self._num_ongoing_requests) def get_num_ongoing_requests(self) -> int: @@ -272,18 +274,18 @@ def get_num_ongoing_requests(self) -> int: def record_request_metrics(self, *, route: str, latency_ms: float, was_error: bool): """Records per-request metrics.""" - if self._cached_metrics_interval_s == 0: - self._processing_latency_tracker.observe(latency_ms, tags={"route": route}) - if was_error: - self._error_counter.inc(tags={"route": route}) - else: - self._request_counter.inc(tags={"route": route}) - else: + if self._cached_metrics_enabled: self._cached_latencies[route].append(latency_ms) if was_error: self._cached_error_counter[route] += 1 else: self._cached_request_counter[route] += 1 + else: + self._processing_latency_tracker.observe(latency_ms, tags={"route": route}) + if was_error: + self._error_counter.inc(tags={"route": route}) + else: + self._request_counter.inc(tags={"route": route}) def _push_autoscaling_metrics(self) -> Dict[str, Any]: look_back_period = self._autoscaling_config.look_back_period_s