Skip to content

Commit

Permalink
Use redis worker for run heartbeats and alerts (#1669)
Browse files Browse the repository at this point in the history
* Move the task run heartbeats to RedisWorker

* Move alerts to redis worker, improving redis worker

* Fix typecheck errors

* Use single threaded tests for redis worker

* Enable/disable the redis workers independently

* Remove preview release from PR checks
  • Loading branch information
ericallam authored Feb 6, 2025
1 parent b946b9f commit a2c2d92
Show file tree
Hide file tree
Showing 33 changed files with 796 additions and 502 deletions.
31 changes: 0 additions & 31 deletions .github/workflows/pr_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,3 @@ jobs:
with:
package: cli-v3
secrets: inherit

preview-release:
name: Preview Release
needs: [typecheck, units, e2e]
if: github.repository == 'triggerdotdev/trigger.dev'
runs-on: ubuntu-latest
steps:
- name: ⬇️ Checkout repo
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: ⎔ Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 8.15.5

- name: ⎔ Setup node
uses: buildjet/setup-node@v4
with:
node-version: 20.11.1
cache: "pnpm"

- name: 📥 Download deps
run: pnpm install --frozen-lockfile

- name: 🏗️ Build
run: pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"

- name: ⚡ Publish preview release
run: npx pkg-pr-new publish --no-template $(ls -d ./packages/*)
76 changes: 76 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,82 @@ const EnvironmentSchema = z.object({
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"),
BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"),

LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),

LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
LEGACY_RUN_ENGINE_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),

COMMON_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
COMMON_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
COMMON_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
COMMON_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
COMMON_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
COMMON_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
20 changes: 1 addition & 19 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
Expand Down Expand Up @@ -157,9 +155,6 @@ const workerCatalog = {
"v3.performTaskRunAlerts": z.object({
runId: z.string(),
}),
"v3.performTaskAttemptAlerts": z.object({
attemptId: z.string(),
}),
"v3.deliverAlert": z.object({
alertId: z.string(),
}),
Expand Down Expand Up @@ -610,15 +605,6 @@ function getWorkerQueue() {
return await service.call(payload.runId);
},
},
"v3.performTaskAttemptAlerts": {
priority: 0,
maxAttempts: 3,
handler: async (payload, job) => {
const service = new PerformTaskAttemptAlertsService();

return await service.call(payload.attemptId);
},
},
"v3.deliverAlert": {
priority: 0,
maxAttempts: 8,
Expand Down Expand Up @@ -658,11 +644,7 @@ function getWorkerQueue() {
"v3.requeueTaskRun": {
priority: 0,
maxAttempts: 3,
handler: async (payload, job) => {
const service = new RequeueTaskRunService();

await service.call(payload.runId);
},
handler: async (payload, job) => {}, // This is now handled by redisWorker
},
"v3.retryAttempt": {
priority: 0,
Expand Down
93 changes: 93 additions & 0 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "common:worker:",
host: env.COMMON_WORKER_REDIS_HOST,
port: env.COMMON_WORKER_REDIS_PORT,
username: env.COMMON_WORKER_REDIS_USERNAME,
password: env.COMMON_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.COMMON_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(`👨‍🏭 Initializing common worker at host ${env.COMMON_WORKER_REDIS_HOST}`);

const worker = new RedisWorker({
name: "common-worker",
redisOptions,
catalog: {
"v3.performTaskRunAlerts": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.performDeploymentAlerts": {
schema: z.object({
deploymentId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.deliverAlert": {
schema: z.object({
alertId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
},
concurrency: {
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.COMMON_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
logger: new Logger("CommonWorker", "debug"),
jobs: {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

return await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

return await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
return await service.call(payload.runId);
},
},
});

if (env.COMMON_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const commonWorker = singleton("commonWorker", initializeWorker);
66 changes: 66 additions & 0 deletions apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "legacy-run-engine:worker:",
host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST,
port: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT,
username: env.LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME,
password: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(
`👨‍🏭 Initializing legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}`
);

const worker = new RedisWorker({
name: "legacy-run-engine-worker",
redisOptions,
catalog: {
runHeartbeat: {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
},
concurrency: {
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
logger: new Logger("LegacyRunEngineWorker", "debug"),
jobs: {
runHeartbeat: async ({ payload }) => {
const service = new TaskRunHeartbeatFailedService();

await service.call(payload.runId);
},
},
});

if (env.LEGACY_RUN_ENGINE_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const legacyRunEngineWorker = singleton("legacyRunEngineWorker", initializeWorker);
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
MessageQueueSubscriber,
VisibilityTimeoutStrategy,
} from "./types";
import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server";
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";

const KEY_PREFIX = "marqs:";

Expand Down Expand Up @@ -1611,7 +1611,7 @@ function getMarQSClient() {
name: "marqs",
tracer: trace.getTracer("marqs"),
keysProducer,
visibilityTimeoutStrategy: new V3VisibilityTimeout(),
visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(),
queuePriorityStrategy: new FairDequeuingStrategy({
tracer: tracer,
redis,
Expand Down
24 changes: 20 additions & 4 deletions apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import { RequeueTaskRunService } from "../requeueTaskRun.server";
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server";
import { VisibilityTimeoutStrategy } from "./types";

export class V3VisibilityTimeout implements VisibilityTimeoutStrategy {
export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
}

async cancelHeartbeat(messageId: string): Promise<void> {
await RequeueTaskRunService.dequeue(messageId);
await TaskRunHeartbeatFailedService.dequeue(messageId);
}
}

export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy {
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
await legacyRunEngineWorker.enqueue({
id: `heartbeat:${messageId}`,
job: "runHeartbeat",
payload: { runId: messageId },
availableAt: new Date(Date.now() + timeoutInMs),
});
}

async cancelHeartbeat(messageId: string): Promise<void> {
await legacyRunEngineWorker.ack(`heartbeat:${messageId}`);
}
}
Loading

0 comments on commit a2c2d92

Please sign in to comment.