Skip to content

Commit

Permalink
v3 queue system (#903)
Browse files Browse the repository at this point in the history
* Introducing Modular Asynchronous Reliable Queueing System (MarQS). Works in dev

* Convert MarQS to using lua and dealing with concurrency

* Simplified the timeout queue and current concurrency is now a set instead of a flat value (to support idempotency)

* Implement task heartbeating and reconnect the background workers CLI when the websocket connection reconnects

* Start adding internal telemetry support for the server

* Get env vars to work in dev and implement prisma tracing in webapp

* Cleanup telemetry and implement it in the consumer

* Implement dequeuing a message from a parent shared queue

* Implement a custom logger exporter instead of using console log exporter

* Use node instead of shell for generating protocol buffer code

* Propogate trace context into debug logs, and allow turning off logger exporter through env vars

* Switch to using baselime for internal otel data

* Make orgMember optional to fix type issues

* Provide the CLI dev env vars through the CLI, don’t build dotenv into facade

* Removed the logger import

* Address Matt’s comments

* Addressing more of Matt’s comments

* Handle sending an execution after a websocket connection closes

* Remove auth from the env attributes to prevent obfuscation
  • Loading branch information
ericallam authored Feb 20, 2024
1 parent d04c8e8 commit 07f7c65
Show file tree
Hide file tree
Showing 57 changed files with 2,748 additions and 746 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ NODE_ENV=development
# FROM_EMAIL=
# REPLY_TO_EMAIL=

# Remove the following line to enable logging telemetry traces to the console
LOG_TELEMETRY="false"

# CLOUD VARIABLES
POSTHOG_PROJECT_KEY=
PLAIN_API_KEY=
Expand Down
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"request": "launch",
"name": "Debug WebApp",
"command": "pnpm run dev --filter webapp",
"envFile": "${workspaceFolder}/apps/webapp/.env",
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}",
"sourceMaps": true
},
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const EnvironmentSchema = z.object({

//v3
V3_ENABLED: z.string().default("false"),
OTLP_EXPORTER_TRACES_URL: z.string().optional(),
LOG_TELEMETRY: z.string().default("true"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export async function findEnvironmentByApiKey(apiKey: string) {
include: {
project: true,
organization: true,
orgMember: true,
},
});

Expand All @@ -30,6 +31,7 @@ export async function findEnvironmentByPublicApiKey(apiKey: string) {
include: {
project: true,
organization: true,
orgMember: true,
},
});

Expand Down
95 changes: 79 additions & 16 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import type {
TaskSpec,
} from "graphile-worker";
import { run as graphileRun, parseCronItems } from "graphile-worker";
import { SpanKind, trace } from "@opentelemetry/api";

import omit from "lodash.omit";
import { z } from "zod";
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { PgListenService } from "~/services/db/pgListen.server";
import { workerLogger as logger, trace } from "~/services/logger.server";
import { workerLogger as logger } from "~/services/logger.server";
import { flattenAttributes } from "@trigger.dev/core/v3";

const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");

export interface MessageCatalogSchema {
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
Expand Down Expand Up @@ -518,13 +522,43 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
throw new Error(`No task for message type: ${String(typeName)}`);
}

await trace(
await tracer.startActiveSpan(
`Run ${typeName as string}`,
{
worker_job: job,
worker_name: this.#name,
kind: SpanKind.CONSUMER,
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
"job.attempts": job.attempts,
"job.max_attempts": job.max_attempts,
"job.created_at": job.created_at.toISOString(),
"job.updated_at": job.updated_at.toISOString(),
...(job.key ? { "job.key": job.key } : {}),
"job.revision": job.revision,
...(job.locked_at ? { "job.locked_at": job.locked_at.toISOString() } : {}),
...(job.locked_by ? { "job.locked_by": job.locked_by } : {}),
...(job.flags ? flattenAttributes(job.flags, "job.flags") : {}),
"worker.name": this.#name,
},
},
async () => {
await task.handler(payload, job);
async (span) => {
try {
await task.handler(payload, job);
} catch (error) {
if (error instanceof Error) {
span.recordException(error);
} else {
span.recordException(new Error(String(error)));
}

throw error;
} finally {
span.end();
}
}
);
}
Expand Down Expand Up @@ -558,16 +592,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

const payload = parsedPayload.data;

try {
await recurringTask.handler(payload._cron, job);
} catch (error) {
logger.error("Failed to handle recurring task", {
error,
payload,
});

throw error;
}
await tracer.startActiveSpan(
`Run ${typeName as string} recurring`,
{
kind: SpanKind.CONSUMER,
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
"job.attempts": job.attempts,
"job.max_attempts": job.max_attempts,
"job.created_at": job.created_at.toISOString(),
"job.updated_at": job.updated_at.toISOString(),
...(job.key ? { "job.key": job.key } : {}),
"job.revision": job.revision,
...(job.locked_at ? { "job.locked_at": job.locked_at.toISOString() } : {}),
...(job.locked_by ? { "job.locked_by": job.locked_by } : {}),
...(job.flags ? flattenAttributes(job.flags, "job.flags") : {}),
"worker.name": this.#name,
},
},
async (span) => {
try {
await recurringTask.handler(payload._cron, job);
} catch (error) {
if (error instanceof Error) {
span.recordException(error);
} else {
span.recordException(new Error(String(error)));
}

throw error;
} finally {
span.end();
}
}
);
}

async #handleCleanup(rawPayload: unknown, helpers: JobHelpers): Promise<void> {
Expand Down
8 changes: 6 additions & 2 deletions apps/webapp/app/services/apiAuth.server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { Prettify } from "@trigger.dev/core";
import { z } from "zod";
import {
findEnvironmentByApiKey,
findEnvironmentByPublicApiKey,
} from "~/models/runtimeEnvironment.server";

type Optional<T, K extends keyof T> = Prettify<Omit<T, K> & Partial<Pick<T, K>>>;

const AuthorizationHeaderSchema = z.string().regex(/^Bearer .+$/);

export type AuthenticatedEnvironment = NonNullable<
Awaited<ReturnType<typeof findEnvironmentByApiKey>>
export type AuthenticatedEnvironment = Optional<
NonNullable<Awaited<ReturnType<typeof findEnvironmentByApiKey>>>,
"orgMember"
>;

type ApiAuthenticationResult = {
Expand Down
95 changes: 95 additions & 0 deletions apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {
ZodMessageHandler,
ZodMessageSender,
clientWebsocketMessages,
serverWebsocketMessages,
} from "@trigger.dev/core/v3";
import { Evt } from "evt";
import { randomUUID } from "node:crypto";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { EnvironmentQueueConsumer } from "./marqs/environmentQueueConsumer.server";
import type { WebSocket, MessageEvent, CloseEvent, ErrorEvent } from "ws";

export class AuthenticatedSocketConnection {
public id: string;
public onClose: Evt<CloseEvent> = new Evt();

private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
private _environmentConsumer: EnvironmentQueueConsumer;
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;

constructor(public ws: WebSocket, public authenticatedEnv: AuthenticatedEnvironment) {
this.id = randomUUID();

this._sender = new ZodMessageSender({
schema: serverWebsocketMessages,
sender: async (message) => {
return new Promise((resolve, reject) => {
ws.send(JSON.stringify(message), {}, (err) => {
if (err) {
reject(err);
return;
}

resolve();
});
});
},
});

this._environmentConsumer = new EnvironmentQueueConsumer(authenticatedEnv, this._sender);

ws.addEventListener("message", this.#handleMessage.bind(this));
ws.addEventListener("close", this.#handleClose.bind(this));
ws.addEventListener("error", this.#handleError.bind(this));

this._messageHandler = new ZodMessageHandler({
schema: clientWebsocketMessages,
messages: {
READY_FOR_TASKS: async (payload) => {
await this._environmentConsumer.registerBackgroundWorker(payload.backgroundWorkerId);
},

BACKGROUND_WORKER_MESSAGE: async (payload) => {
switch (payload.data.type) {
case "TASK_RUN_COMPLETED": {
await this._environmentConsumer.taskRunCompleted(
payload.backgroundWorkerId,
payload.data.completion
);
break;
}
case "TASK_HEARTBEAT": {
await this._environmentConsumer.taskHeartbeat(
payload.backgroundWorkerId,
payload.data.id
);
break;
}
}
},
},
});
}

async initialize() {
this._sender.send("SERVER_READY", { id: this.id });
}

async #handleMessage(ev: MessageEvent) {
const data = JSON.parse(ev.data.toString());

await this._messageHandler.handleMessage(data);
}

async #handleClose(ev: CloseEvent) {
await this._environmentConsumer.stop();

this.onClose.post(ev);
}

async #handleError(ev: ErrorEvent) {
logger.error("Websocket error", { ev });
}
}
12 changes: 11 additions & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ export type CreatableEventEnvironmentType = CreatableEvent["environmentType"];
export type TraceAttributes = Partial<
Pick<
CreatableEvent,
"attemptId" | "isError" | "runId" | "output" | "metadata" | "properties" | "style"
| "attemptId"
| "isError"
| "runId"
| "output"
| "metadata"
| "properties"
| "style"
| "queueId"
| "queueName"
>
>;

Expand Down Expand Up @@ -167,6 +175,8 @@ export class EventRepository {
projectRef: options.environment.project.externalRef,
runId: options.attributes.runId,
taskSlug: options.taskSlug,
queueId: options.attributes.queueId,
queueName: options.attributes.queueName,
properties: {
...style,
...(flattenAttributes(metadata, SemanticInternalAttributes.METADATA) as Record<
Expand Down
Loading

0 comments on commit 07f7c65

Please sign in to comment.