Skip to content

Commit

Permalink
Batch queue runs that are waiting for deploy (#1693)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-aitken authored Feb 11, 2025
1 parent a50b93f commit 6017c52
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 42 deletions.
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ const EnvironmentSchema = z.object({
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE: z.coerce.number().int().default(100),
LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS: z.coerce.number().int().default(1_000),

COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
Expand Down
77 changes: 35 additions & 42 deletions apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { env } from "~/env.server";

export class ExecuteTasksWaitingForDeployService extends BaseService {
public async call(backgroundWorkerId: string) {
Expand All @@ -17,7 +18,11 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
organization: true,
},
},
tasks: true,
tasks: {
select: {
slug: true,
},
},
},
});

Expand All @@ -26,6 +31,8 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
return;
}

const maxCount = env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE;

const runsWaitingForDeploy = await this._prisma.taskRun.findMany({
where: {
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
Expand All @@ -36,8 +43,16 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
},
},
orderBy: {
number: "asc",
createdAt: "asc",
},
select: {
id: true,
status: true,
taskIdentifier: true,
concurrencyKey: true,
queue: true,
},
take: maxCount + 1,
});

if (!runsWaitingForDeploy.length) {
Expand All @@ -63,50 +78,28 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
});
}

if (!marqs) {
return;
}

const enqueues: Promise<any>[] = [];
let i = 0;

for (const run of runsWaitingForDeploy) {
enqueues.push(
marqs.enqueueMessage(
backgroundWorker.runtimeEnvironment,
run.queue,
run.id,
{
type: "EXECUTE",
taskIdentifier: run.taskIdentifier,
projectId: backgroundWorker.runtimeEnvironment.projectId,
environmentId: backgroundWorker.runtimeEnvironment.id,
environmentType: backgroundWorker.runtimeEnvironment.type,
},
run.concurrencyKey ?? undefined,
Date.now() + i * 5 // slight delay to help preserve order
)
await marqs?.enqueueMessage(
backgroundWorker.runtimeEnvironment,
run.queue,
run.id,
{
type: "EXECUTE",
taskIdentifier: run.taskIdentifier,
projectId: backgroundWorker.runtimeEnvironment.projectId,
environmentId: backgroundWorker.runtimeEnvironment.id,
environmentType: backgroundWorker.runtimeEnvironment.type,
},
run.concurrencyKey ?? undefined
);

i++;
}

const settled = await Promise.allSettled(enqueues);

if (settled.some((s) => s.status === "rejected")) {
const rejectedRuns: { id: string; reason: any }[] = [];

runsWaitingForDeploy.forEach((run, i) => {
if (settled[i].status === "rejected") {
const rejected = settled[i] as PromiseRejectedResult;

rejectedRuns.push({ id: run.id, reason: rejected.reason });
}
});

logger.error("Failed to requeue task runs for immediate execution", {
rejectedRuns,
});
if (runsWaitingForDeploy.length > maxCount) {
await ExecuteTasksWaitingForDeployService.enqueue(
backgroundWorkerId,
this._prisma,
new Date(Date.now() + env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS)
);
}
}

Expand Down

0 comments on commit 6017c52

Please sign in to comment.