From 4dd42cd6d43389ebb471f3c72242c71875d9e077 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 10 Feb 2025 13:41:32 +0000 Subject: [PATCH] Retry heartbeat timeouts by putting back in the queue (#1689) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * If there’s a heartbeat error and no attempts we put it back in the queue to try again * When nacking, return whether it was put back in the queue or not * Try and nack, if it fails then fail the run * Consolidated switch statement * Fail executing/retrying runs --- apps/webapp/app/v3/marqs/index.server.ts | 9 ++- .../app/v3/taskRunHeartbeatFailed.server.ts | 58 ++++++++++--------- 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index dfe5a6028d..341f003464 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -638,7 +638,8 @@ export class MarQS { } /** - * Negative acknowledge a message, which will requeue the message + * Negative acknowledge a message, which will requeue the message. + * Returns whether it went back into the queue or not. */ public async nackMessage( messageId: string, @@ -657,7 +658,7 @@ export class MarQS { updates, service: this.name, }); - return; + return false; } const nackCount = await this.#getNackCount(messageId); @@ -676,7 +677,7 @@ export class MarQS { // If we have reached the maximum nack count, we will ack the message await this.acknowledgeMessage(messageId, "maximum nack count reached"); - return; + return false; } span.setAttributes({ @@ -705,6 +706,8 @@ export class MarQS { }); await this.options.subscriber?.messageNacked(message); + + return true; }, { kind: SpanKind.CONSUMER, diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index d722a6f80b..fa564c06b8 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -30,27 +30,42 @@ export class TaskRunHeartbeatFailedService extends BaseService { supportsLazyAttempts: true, }, }, + _count: { + select: { + attempts: true, + }, + }, }, }); if (!taskRun) { - logger.error("[RequeueTaskRunService] Task run not found", { + logger.error("[TaskRunHeartbeatFailedService] Task run not found", { runId, }); return; } + const service = new FailedTaskRunService(); + switch (taskRun.status) { - case "PENDING": { - if (taskRun.lockedAt) { + case "PENDING": + case "WAITING_TO_RESUME": + case "PAUSED": { + const backInQueue = await marqs?.nackMessage(taskRun.id); + + if (backInQueue) { logger.debug( - "[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked", + `[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`, + { + taskRun, + } + ); + } else { + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`, { taskRun } ); - - const service = new FailedTaskRunService(); - await service.call(taskRun.friendlyId, { ok: false, id: taskRun.friendlyId, @@ -61,19 +76,13 @@ export class TaskRunHeartbeatFailedService extends BaseService { message: "Did not receive a heartbeat from the worker in time", }, }); - } else { - logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun }); - - await marqs?.nackMessage(taskRun.id); } break; } case "EXECUTING": case "RETRYING_AFTER_FAILURE": { - logger.debug("[RequeueTaskRunService] Failing task run", { taskRun }); - - const service = new FailedTaskRunService(); + logger.debug(`[RequeueTaskRunService] ${taskRun.status} failing task run`, { taskRun }); await service.call(taskRun.friendlyId, { ok: false, @@ -90,23 +99,18 @@ export class TaskRunHeartbeatFailedService extends BaseService { } case "DELAYED": case "WAITING_FOR_DEPLOY": { - logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun }); + logger.debug( + `[TaskRunHeartbeatFailedService] ${taskRun.status} Removing task run from queue`, + { taskRun } + ); await marqs?.acknowledgeMessage( taskRun.id, - "Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService" + "Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in TaskRunHeartbeatFailedService" ); break; } - case "WAITING_TO_RESUME": - case "PAUSED": { - logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun }); - - await marqs?.nackMessage(taskRun.id); - - break; - } case "SYSTEM_FAILURE": case "INTERRUPTED": case "CRASHED": @@ -115,11 +119,11 @@ export class TaskRunHeartbeatFailedService extends BaseService { case "EXPIRED": case "TIMED_OUT": case "CANCELED": { - logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun }); + logger.debug("[TaskRunHeartbeatFailedService] Task run is completed", { taskRun }); await marqs?.acknowledgeMessage( taskRun.id, - "Task run is already completed in RequeueTaskRunService" + "Task run is already completed in TaskRunHeartbeatFailedService" ); try { @@ -135,7 +139,7 @@ export class TaskRunHeartbeatFailedService extends BaseService { delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined, }); } catch (error) { - logger.error("[RequeueTaskRunService] Error signaling run cancellation", { + logger.error("[TaskRunHeartbeatFailedService] Error signaling run cancellation", { runId: taskRun.id, error: error instanceof Error ? error.message : error, });