Skip to content

Commit

Permalink
Move run ttl and delays from graphile to redis worker (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam authored Feb 6, 2025
1 parent a2c2d92 commit 3ec6983
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 42 deletions.
36 changes: 33 additions & 3 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server";
import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server";

function initializeWorker() {
const redisOptions = {
Expand Down Expand Up @@ -52,6 +54,24 @@ function initializeWorker() {
maxAttempts: 3,
},
},
"v3.expireRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
"v3.enqueueDelayedRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
},
concurrency: {
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
Expand All @@ -65,16 +85,26 @@ function initializeWorker() {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

return await service.call(payload.alertId);
await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

return await service.call(payload.deploymentId);
await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
return await service.call(payload.runId);
await service.call(payload.runId);
},
"v3.expireRun": async ({ payload }) => {
const service = new ExpireEnqueuedRunService();

await service.call(payload.runId);
},
"v3.enqueueDelayedRun": async ({ payload }) => {
const service = new EnqueueDelayedRunService();

await service.call(payload.runId);
},
},
});
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class CreateTaskRunAttemptService extends BaseService {
});

if (taskRun.ttl) {
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
await ExpireEnqueuedRunService.ack(taskRun.id, tx);
}
}

Expand Down
28 changes: 27 additions & 1 deletion apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,34 @@ import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { commonWorker } from "../commonWorker.server";
import { workerQueue } from "~/services/worker.server";

export class EnqueueDelayedRunService extends BaseService {
public static async enqueue(runId: string, runAt?: Date) {
await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public static async reschedule(runId: string, runAt?: Date) {
// We have to do this for now because it's possible that the workerQueue
// was used when the run was first delayed, and EnqueueDelayedRunService.reschedule
// is called from RescheduleTaskRunService, which allows the runAt to be changed
// so if we don't dequeue the old job, we might end up with multiple jobs
await workerQueue.dequeue(`v3.enqueueDelayedRun.${runId}`);

await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public async call(runId: string) {
const run = await this._prisma.taskRun.findFirst({
where: {
Expand Down Expand Up @@ -52,7 +78,7 @@ export class EnqueueDelayedRunService extends BaseService {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(run.id, expireAt);
}
}
});
Expand Down
25 changes: 14 additions & 11 deletions apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import { PrismaClientOrTransaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { commonWorker } from "../commonWorker.server";
import { eventRepository } from "../eventRepository.server";
import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { workerQueue } from "~/services/worker.server";
import { PrismaClientOrTransaction } from "~/db.server";

export class ExpireEnqueuedRunService extends BaseService {
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
// We don't "dequeue" from the workerQueue here because it would be redundant and if this service
// is called for a run that has already started, nothing happens
await commonWorker.ack(`v3.expireRun:${runId}`);
}

public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
return await workerQueue.enqueue(
"v3.expireRun",
{ runId },
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
);
public static async enqueue(runId: string, runAt?: Date) {
return await commonWorker.enqueue({
job: "v3.expireRun",
payload: { runId },
availableAt: runAt,
id: `v3.expireRun:${runId}`,
});
}

public async call(runId: string) {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class FinalizeTaskRunService extends BaseService {
});

if (run.ttl) {
await ExpireEnqueuedRunService.dequeue(run.id);
await ExpireEnqueuedRunService.ack(run.id);
}

if (attemptStatus || error) {
Expand Down
31 changes: 12 additions & 19 deletions apps/webapp/app/v3/services/rescheduleTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
import { parseDelay } from "./triggerTask.server";
import { $transaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";

export class RescheduleTaskRunService extends BaseService {
public async call(taskRun: TaskRun, body: RescheduleRunRequestBody) {
Expand All @@ -17,23 +16,17 @@ export class RescheduleTaskRunService extends BaseService {
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
}

return await $transaction(this._prisma, "reschedule run", async (tx) => {
const updatedRun = await tx.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});
const updatedRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});

await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delay, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);

return updatedRun;
});
return updatedRun;
}
}
9 changes: 3 additions & 6 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { clampMaxDuration } from "../utils/maxDuration";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { Prisma, TaskRun } from "@trigger.dev/database";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -515,18 +516,14 @@ export class TriggerTaskService extends BaseService {
}

if (taskRun.delayUntil) {
await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.enqueue(taskRun.id, taskRun.delayUntil);
}

if (!taskRun.delayUntil && taskRun.ttl) {
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt);
}
}

Expand Down

0 comments on commit 3ec6983

Please sign in to comment.