Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOM retrying on larger machines #1691

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/forty-windows-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine.
67 changes: 46 additions & 21 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import {
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
} from "@trigger.dev/core/v3";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import * as semver from "semver";
import { logger } from "~/services/logger.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import { BaseService } from "./services/baseService.server";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import { CompleteAttemptService } from "./services/completeAttempt.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import * as semver from "semver";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";

const FailedTaskRunRetryGetPayload = {
select: {
Expand Down Expand Up @@ -180,13 +180,52 @@ export class FailedTaskRunRetryHelper extends BaseService {
}
}

static async getExecutionRetry({
static getExecutionRetry({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): Promise<TaskRunExecutionRetry | undefined> {
}): TaskRunExecutionRetry | undefined {
try {
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run, execution });
if (!retryConfig) {
return;
}

const delay = calculateNextRetryDelay(retryConfig, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}

return {
timestamp: Date.now() + delay,
delay,
};
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
execution,
error,
});

return;
}
}

static getRetryConfig({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): RetryOptions | undefined {
try {
const retryConfig = run.lockedBy?.retryConfig;

Expand Down Expand Up @@ -247,21 +286,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
return;
}

const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}

return {
timestamp: Date.now() + delay,
delay,
};
return parsedRetryConfig.data;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
Expand Down
63 changes: 61 additions & 2 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Attributes } from "@opentelemetry/api";
import {
TaskRunContext,
TaskRunError,
TaskRunErrorCodes,
TaskRunExecution,
TaskRunExecutionResult,
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
TaskRunSuccessfulExecutionResult,
exceptionEventEnhancer,
flattenAttributes,
internalErrorFromUnexpectedExit,
sanitizeError,
shouldRetryError,
taskRunErrorEnhancer,
Expand Down Expand Up @@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService {

if (!executionRetry && shouldInfer) {
executionRetryInferred = true;
executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
Expand All @@ -243,7 +246,43 @@ export class CompleteAttemptService extends BaseService {
});
}

const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
let isOOMRetry = false;

//OOM errors should retry (if an OOM machine is specified)
if (isOOMError(completion.error)) {
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
lockedToVersion: taskRunAttempt.backgroundWorker,
},
execution,
});

if (retryConfig?.outOfMemory?.machine) {
isOOMRetry = true;
retriableError = true;
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
run: {
...taskRunAttempt.taskRun,
lockedBy: taskRunAttempt.backgroundWorkerTask,
lockedToVersion: taskRunAttempt.backgroundWorker,
},
execution,
});

//update the machine on the run
await this._prisma.taskRun.update({
where: {
id: taskRunAttempt.taskRunId,
},
data: {
machinePreset: retryConfig.outOfMemory.machine,
},
});
}
}

if (
retriableError &&
Expand All @@ -257,6 +296,7 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt,
environment,
checkpoint,
forceRequeue: isOOMRetry,
});
}

Expand Down Expand Up @@ -378,12 +418,14 @@ export class CompleteAttemptService extends BaseService {
executionRetryInferred,
checkpointEventId,
supportsLazyAttempts,
forceRequeue = false,
}: {
run: TaskRun;
executionRetry: TaskRunExecutionRetry;
executionRetryInferred: boolean;
checkpointEventId?: string;
supportsLazyAttempts: boolean;
forceRequeue?: boolean;
}) {
const retryViaQueue = () => {
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
Expand Down Expand Up @@ -434,6 +476,11 @@ export class CompleteAttemptService extends BaseService {
return;
}

if (forceRequeue) {
logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id });
await retryViaQueue();
}

// Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold
if (
!this.opts.supportsRetryCheckpoints &&
Expand Down Expand Up @@ -466,13 +513,15 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt,
environment,
checkpoint,
forceRequeue = false,
}: {
execution: TaskRunExecution;
executionRetry: TaskRunExecutionRetry;
executionRetryInferred: boolean;
taskRunAttempt: NonNullable<FoundAttempt>;
environment: AuthenticatedEnvironment;
checkpoint?: CheckpointData;
forceRequeue?: boolean;
}) {
const retryAt = new Date(executionRetry.timestamp);

Expand Down Expand Up @@ -533,6 +582,7 @@ export class CompleteAttemptService extends BaseService {
executionRetry,
supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts,
executionRetryInferred,
forceRequeue,
});

return "RETRIED";
Expand Down Expand Up @@ -634,3 +684,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
},
});
}

function isOOMError(error: TaskRunError) {
if (error.type !== "INTERNAL_ERROR") return false;
if (error.code === "TASK_PROCESS_OOM_KILLED" || error.code === "TASK_PROCESS_MAYBE_OOM_KILLED") {
return true;
}

return false;
}
35 changes: 32 additions & 3 deletions docs/machines.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ The `machine` configuration is optional. Using higher spec machines will increas
```ts /trigger/heavy-task.ts
export const heavyTask = task({
id: "heavy-task",
machine: {
preset: "large-1x",
},
machine: "large-1x",
run: async ({ payload, ctx }) => {
//...
},
Expand All @@ -28,6 +26,37 @@ export const config: TriggerConfig = {
};
```

## Out Of Memory errors

Sometimes you might see one of your runs fail with an "Out Of Memory" error.

> TASK_PROCESS_OOM_KILLED. Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.

If this happens regularly you need to either optimize the memory-efficiency of your code, or increase the machine.

### Retrying with a larger machine

If you are seeing rare OOM errors, you can add a setting to your task to retry with a large machine if you get an OOM error:

```ts /trigger/heavy-task.ts
export const yourTask = task({
id: "your-task",
machine: "medium-1x",
retry: {
outOfMemory: {
machine: "large-1x",
},
},
run: async (payload: any, { ctx }) => {
//...
},
});
```

<Note>
This will only retry the task if you get an OOM error. It won't permanently change the machine that a new run starts on, so if you consistently see OOM errors you should change the machine in the `machine` property.
</Note>

## Machine configurations

| Preset | vCPU | Memory | Disk space |
Expand Down
16 changes: 13 additions & 3 deletions packages/core/src/v3/schemas/schemas.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { z } from "zod";
import { RequireKeys } from "../types/index.js";
import { MachineConfig, MachinePreset, TaskRunExecution } from "./common.js";
import { MachineConfig, MachinePreset, MachinePresetName, TaskRunExecution } from "./common.js";

/*
WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead.
Expand Down Expand Up @@ -95,15 +95,25 @@ export const RetryOptions = z.object({
* This can be useful to prevent the thundering herd problem where all retries happen at the same time.
*/
randomize: z.boolean().optional(),

/** If a run fails with an Out Of Memory (OOM) error and you have this set, it will retry with the machine you specify.
* Note: it will not default to this [machine](https://trigger.dev/docs/machines) for new runs, only for failures caused by OOM errors.
* So if you frequently have attempts failing with OOM errors, you should set the [default machine](https://trigger.dev/docs/machines) to be higher.
*/
outOfMemory: z
.object({
machine: MachinePresetName.optional(),
})
.optional(),
});

export type RetryOptions = z.infer<typeof RetryOptions>;

export const QueueOptions = z.object({
/** You can define a shared queue and then pass the name in to your task.
*
*
* @example
*
*
* ```ts
* const myQueue = queue({
name: "my-queue",
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,14 @@ type CommonTaskOptions<
* ```
*/
queue?: QueueOptions;
/** Configure the spec of the machine you want your task to run on.
/** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on.
*
* @example
*
* ```ts
* export const heavyTask = task({
id: "heavy-task",
machine: {
cpu: 2,
memory: 4,
},
machine: "medium-1x",
run: async ({ payload, ctx }) => {
//...
},
Expand Down
41 changes: 41 additions & 0 deletions references/hello-world/src/trigger/oom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { logger, task } from "@trigger.dev/sdk/v3";
import { setTimeout } from "timers/promises";

export const oomTask = task({
id: "oom-task",
machine: "micro",
retry: {
outOfMemory: {
machine: "small-1x",
},
},
run: async ({ succeedOnLargerMachine }: { succeedOnLargerMachine: boolean }, { ctx }) => {
logger.info("running out of memory below this line");

logger.info(`Running on ${ctx.machine?.name}`);

await setTimeout(2000);

if (ctx.machine?.name !== "micro" && succeedOnLargerMachine) {
logger.info("Going to succeed now");
return {
success: true,
};
}

let a = "a";

try {
while (true) {
a += a;
}
} catch (error) {
logger.error(error instanceof Error ? error.message : "Unknown error", { error });

let b = [];
while (true) {
b.push(a.replace(/a/g, "b"));
}
}
},
});
Loading