Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes committed Jan 21, 2025
1 parent e9089a2 commit f12b88b
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def __init__(
SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE
)
self._num_ongoing_requests = 0

# If the interval is set to 0, eagerly sets all metrics.
self._cached_metrics_enabled = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS != 0
self._cached_metrics_interval_s = RAY_SERVE_METRICS_EXPORT_INTERVAL_MS / 1000

# Request counter (only set on replica startup).
Expand All @@ -154,7 +157,7 @@ def __init__(
),
tag_keys=("route",),
)
if self._cached_metrics_interval_s:
if self._cached_metrics_enabled:
self._cached_request_counter = defaultdict(int)

self._error_counter = metrics.Counter(
Expand All @@ -164,7 +167,7 @@ def __init__(
),
tag_keys=("route",),
)
if self._cached_metrics_interval_s:
if self._cached_metrics_enabled:
self._cached_error_counter = defaultdict(int)

self._processing_latency_tracker = metrics.Histogram(
Expand All @@ -173,7 +176,7 @@ def __init__(
boundaries=DEFAULT_LATENCY_BUCKET_MS,
tag_keys=("route",),
)
if self._cached_metrics_interval_s:
if self._cached_metrics_enabled:
self._cached_latencies = defaultdict(deque)

self._num_ongoing_requests_gauge = metrics.Gauge(
Expand All @@ -183,7 +186,7 @@ def __init__(

self.set_autoscaling_config(autoscaling_config)

if self._cached_metrics_interval_s:
if self._cached_metrics_enabled:
event_loop.create_task(self._report_cached_metrics_forever())

def _report_cached_metrics(self):
Expand Down Expand Up @@ -218,7 +221,6 @@ async def _report_cached_metrics_forever(self):

# Exponential backoff starting at 1s and capping at 10s.
backoff_time_s = min(10, 2**consecutive_errors)
print(backoff_time_s)
consecutive_errors += 1
await asyncio.sleep(backoff_time_s)

Expand Down Expand Up @@ -257,13 +259,13 @@ def set_autoscaling_config(self, autoscaling_config: Optional[AutoscalingConfig]
def inc_num_ongoing_requests(self) -> int:
"""Increment the current total queue length of requests for this replica."""
self._num_ongoing_requests += 1
if self._cached_metrics_interval_s == 0:
if not self._cached_metrics_enabled:
self._num_ongoing_requests_gauge.set(self._num_ongoing_requests)

def dec_num_ongoing_requests(self) -> int:
"""Decrement the current total queue length of requests for this replica."""
self._num_ongoing_requests -= 1
if self._cached_metrics_interval_s == 0:
if not self._cached_metrics_enabled:
self._num_ongoing_requests_gauge.set(self._num_ongoing_requests)

def get_num_ongoing_requests(self) -> int:
Expand All @@ -272,18 +274,18 @@ def get_num_ongoing_requests(self) -> int:

def record_request_metrics(self, *, route: str, latency_ms: float, was_error: bool):
"""Records per-request metrics."""
if self._cached_metrics_interval_s == 0:
self._processing_latency_tracker.observe(latency_ms, tags={"route": route})
if was_error:
self._error_counter.inc(tags={"route": route})
else:
self._request_counter.inc(tags={"route": route})
else:
if self._cached_metrics_enabled:
self._cached_latencies[route].append(latency_ms)
if was_error:
self._cached_error_counter[route] += 1
else:
self._cached_request_counter[route] += 1
else:
self._processing_latency_tracker.observe(latency_ms, tags={"route": route})
if was_error:
self._error_counter.inc(tags={"route": route})
else:
self._request_counter.inc(tags={"route": route})

def _push_autoscaling_metrics(self) -> Dict[str, Any]:
look_back_period = self._autoscaling_config.look_back_period_s
Expand Down

0 comments on commit f12b88b

Please sign in to comment.