Skip to content

Commit

Permalink
v3: cli cancel improvements and add back in deprecating background wo…
Browse files Browse the repository at this point in the history
…rkers (#918)

* Show runs as cancelled when killing the dev CLI

* Fixed error when waiting for a task

* Deprecated workers will no longer be able to run unlocked task runs

* Fixed selecting the latest background worker version in test list

* Don’t propogate trace context when triggering an async task

Also add batch id to the context and the task events
  • Loading branch information
ericallam authored Feb 29, 2024
1 parent 933cf0f commit ca47e6b
Show file tree
Hide file tree
Showing 28 changed files with 431 additions and 110 deletions.
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/TestPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class TestPresenter {
WITH workers AS (
SELECT
bw.*,
ROW_NUMBER() OVER(PARTITION BY bw."runtimeEnvironmentId" ORDER BY bw.version DESC) AS rn
ROW_NUMBER() OVER(PARTITION BY bw."runtimeEnvironmentId" ORDER BY string_to_array(bw.version, '.')::int[] DESC) AS rn
FROM
"BackgroundWorker" bw
WHERE "projectId" = ${project.id}
Expand Down
16 changes: 4 additions & 12 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import {
parseBatchTriggerTaskRequestBody,
parseTriggerTaskRequestBody,
} from "@trigger.dev/core/v3";
import { parseBatchTriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { MAX_BATCH_TRIGGER_ITEMS } from "~/consts";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { BatchTriggerTaskService } from "~/v3/services/batchTriggerTask.server";
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

const ParamsSchema = z.object({
taskId: z.string(),
});

const HeadersSchema = z.object({
"idempotency-key": z.string().optional().nullable(),
"trigger-version": z.string().optional().nullable(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
Expand All @@ -46,6 +36,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
} = headers.data;
Expand Down Expand Up @@ -89,6 +80,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext: traceparent ? { traceparent, tracestate } : undefined,
spanParentAsLink: spanParentAsLink === 1,
});

if (!result) {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ const ParamsSchema = z.object({
taskId: z.string(),
});

const HeadersSchema = z.object({
export const HeadersSchema = z.object({
"idempotency-key": z.string().optional().nullable(),
"trigger-version": z.string().optional().nullable(),
"x-trigger-span-parent-as-link": z.coerce.number().optional().nullable(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -41,6 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
} = headers.data;
Expand Down Expand Up @@ -70,6 +72,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext: traceparent ? { traceparent, tracestate } : undefined,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ export class AuthenticatedSocketConnection {
READY_FOR_TASKS: async (payload) => {
await this._consumer.registerBackgroundWorker(payload.backgroundWorkerId);
},

BACKGROUND_WORKER_DEPRECATED: async (payload) => {
await this._consumer.deprecateBackgroundWorker(payload.backgroundWorkerId);
},
BACKGROUND_WORKER_MESSAGE: async (payload) => {
switch (payload.data.type) {
case "TASK_RUN_COMPLETED": {
Expand Down
30 changes: 26 additions & 4 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Attributes } from "@opentelemetry/api";
import { Attributes, Link, TraceFlags } from "@opentelemetry/api";
import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import {
Expand Down Expand Up @@ -49,6 +49,7 @@ export type TraceAttributes = Partial<
| "style"
| "queueId"
| "queueName"
| "batchId"
>
>;

Expand All @@ -57,6 +58,7 @@ export type SetAttribute<T extends TraceAttributes> = (key: keyof T, value: T[ke
export type TraceEventOptions = {
kind?: CreatableEventKind;
context?: Record<string, string | undefined>;
spanParentAsLink?: boolean;
spanIdSeed?: string;
attributes: TraceAttributes;
environment: AuthenticatedEnvironment;
Expand Down Expand Up @@ -378,6 +380,7 @@ export class EventRepository {
[SemanticInternalAttributes.PROJECT_REF]: options.environment.project.externalRef,
[SemanticInternalAttributes.RUN_ID]: options.attributes.runId,
[SemanticInternalAttributes.RUN_IS_TEST]: options.attributes.runIsTest ?? false,
[SemanticInternalAttributes.BATCH_ID]: options.attributes.batchId ?? undefined,
[SemanticInternalAttributes.TASK_SLUG]: options.taskSlug,
[SemanticResourceAttributes.SERVICE_NAME]: "api server",
[SemanticResourceAttributes.SERVICE_NAMESPACE]: "trigger.dev",
Expand Down Expand Up @@ -416,6 +419,7 @@ export class EventRepository {
taskSlug: options.taskSlug,
queueId: options.attributes.queueId,
queueName: options.attributes.queueName,
batchId: options.attributes.batchId ?? undefined,
properties: {
...style,
...(flattenAttributes(metadata, SemanticInternalAttributes.METADATA) as Record<
Expand Down Expand Up @@ -451,9 +455,11 @@ export class EventRepository {
const start = process.hrtime.bigint();
const startTime = new Date();

const traceId = propagatedContext?.traceparent?.traceId ?? this.generateTraceId();
const parentId = propagatedContext?.traceparent?.spanId;
const tracestate = propagatedContext?.tracestate;
const traceId = options.spanParentAsLink
? this.generateTraceId()
: propagatedContext?.traceparent?.traceId ?? this.generateTraceId();
const parentId = options.spanParentAsLink ? undefined : propagatedContext?.traceparent?.spanId;
const tracestate = options.spanParentAsLink ? undefined : propagatedContext?.tracestate;
const spanId = options.spanIdSeed
? this.#generateDeterministicSpanId(traceId, options.spanIdSeed)
: this.generateSpanId();
Expand All @@ -462,6 +468,19 @@ export class EventRepository {
traceparent: `00-${traceId}-${spanId}-01`,
};

const links: Link[] =
options.spanParentAsLink && propagatedContext?.traceparent
? [
{
context: {
traceId: propagatedContext.traceparent.traceId,
spanId: propagatedContext.traceparent.spanId,
traceFlags: TraceFlags.SAMPLED,
},
},
]
: [];

const eventBuilder = {
traceId,
spanId,
Expand Down Expand Up @@ -493,6 +512,7 @@ export class EventRepository {
[SemanticInternalAttributes.PROJECT_REF]: options.environment.project.externalRef,
[SemanticInternalAttributes.RUN_ID]: options.attributes.runId,
[SemanticInternalAttributes.RUN_IS_TEST]: options.attributes.runIsTest ?? false,
[SemanticInternalAttributes.BATCH_ID]: options.attributes.batchId ?? undefined,
[SemanticInternalAttributes.TASK_SLUG]: options.taskSlug,
[SemanticResourceAttributes.SERVICE_NAME]: "api server",
[SemanticResourceAttributes.SERVICE_NAMESPACE]: "trigger.dev",
Expand Down Expand Up @@ -532,6 +552,7 @@ export class EventRepository {
taskSlug: options.taskSlug,
queueId: options.attributes.queueId,
queueName: options.attributes.queueName,
batchId: options.attributes.batchId ?? undefined,
properties: {
...style,
...(flattenAttributes(metadata, SemanticInternalAttributes.METADATA) as Record<
Expand All @@ -543,6 +564,7 @@ export class EventRepository {
metadata: metadata,
style: stripAttributePrefix(style, SemanticInternalAttributes.STYLE),
output: undefined,
links: links as unknown as Prisma.InputJsonValue,
};

if (options.immediate) {
Expand Down
63 changes: 40 additions & 23 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api";
import {
RetryOptions,
TaskRunContext,
TaskRunExecution,
TaskRunExecutionPayload,
TaskRunExecutionResult,
ZodMessageSender,
defaultRetryOptions,
flattenAttributes,
serverWebsocketMessages,
} from "@trigger.dev/core/v3";
import { BackgroundWorker, BackgroundWorkerTask } from "@trigger.dev/database";
import { z } from "zod";
import { prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { marqs } from "../marqs.server";
import { attributesFromAuthenticatedEnv } from "../tracer.server";
import { eventRepository } from "../eventRepository.server";
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
import { CancelAttemptService } from "../services/cancelAttempt.server";
import { CompleteAttemptService } from "../services/completeAttempt.server";
import { attributesFromAuthenticatedEnv } from "../tracer.server";

const tracer = trace.getTracer("devQueueConsumer");

Expand All @@ -41,6 +36,7 @@ export type DevQueueConsumerOptions = {

export class DevQueueConsumer {
private _backgroundWorkers: Map<string, BackgroundWorkerWithTasks> = new Map();
private _deprecatedWorkers: Map<string, BackgroundWorkerWithTasks> = new Map();
private _enabled = false;
private _options: Required<DevQueueConsumerOptions>;
private _perTraceCountdown: number | undefined;
Expand All @@ -63,6 +59,18 @@ export class DevQueueConsumer {
};
}

// This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
public async deprecateBackgroundWorker(id: string) {
const backgroundWorker = this._backgroundWorkers.get(id);

if (!backgroundWorker) {
return;
}

this._deprecatedWorkers.set(id, backgroundWorker);
this._backgroundWorkers.delete(id);
}

public async registerBackgroundWorker(id: string) {
const backgroundWorker = await prisma.backgroundWorker.findUnique({
where: { friendlyId: id, runtimeEnvironmentId: this.env.id },
Expand Down Expand Up @@ -102,17 +110,6 @@ export class DevQueueConsumer {
await service.call(completion, execution, this.env);
}

#generateMetadataAttributesForNextAttempt(execution: TaskRunExecution) {
const context = TaskRunContext.parse(execution);

// @ts-ignore
context.attempt = {
number: context.attempt.number + 1,
};

return flattenAttributes(context, "ctx");
}

public async taskHeartbeat(workerId: string, id: string, seconds: number = 60) {
const taskRunAttempt = await prisma.taskRunAttempt.findUnique({
where: { friendlyId: id },
Expand Down Expand Up @@ -277,7 +274,8 @@ export class DevQueueConsumer {
}

const backgroundWorker = existingTaskRun.lockedToVersionId
? this._backgroundWorkers.get(existingTaskRun.lockedToVersionId)
? this._deprecatedWorkers.get(existingTaskRun.lockedToVersionId) ??
this._backgroundWorkers.get(existingTaskRun.lockedToVersionId)
: this.#getLatestBackgroundWorker();

if (!backgroundWorker) {
Expand Down Expand Up @@ -318,6 +316,11 @@ export class DevQueueConsumer {
orderBy: { number: "desc" },
},
tags: true,
batchItem: {
include: {
batchTaskRun: true,
},
},
},
});

Expand Down Expand Up @@ -365,7 +368,7 @@ export class DevQueueConsumer {
},
});

const execution = {
const execution: TaskRunExecution = {
task: {
id: backgroundTask.slug,
filePath: backgroundTask.filePath,
Expand Down Expand Up @@ -408,6 +411,9 @@ export class DevQueueConsumer {
slug: this.env.project.slug,
name: this.env.project.name,
},
batch: lockedTaskRun.batchItem?.batchTaskRun
? { id: lockedTaskRun.batchItem.batchTaskRun.friendlyId }
: undefined,
};

const environmentRepository = new EnvironmentVariablesRepository();
Expand Down Expand Up @@ -471,7 +477,7 @@ export class DevQueueConsumer {
}

// Get the latest background worker based on the version.
// Versions are in the format of 20240101.1 and 20240101.2
// Versions are in the format of 20240101.1 and 20240101.2, or even 20240101.10, 20240101.11, etc.
#getLatestBackgroundWorker() {
const workers = Array.from(this._backgroundWorkers.values());

Expand All @@ -480,11 +486,22 @@ export class DevQueueConsumer {
}

return workers.reduce((acc, curr) => {
if (acc.version > curr.version) {
const accParts = acc.version.split(".").map(Number);
const currParts = curr.version.split(".").map(Number);

// Compare the major part
if (accParts[0] < currParts[0]) {
return curr;
} else if (accParts[0] > currParts[0]) {
return acc;
}

return curr;
// Compare the minor part (assuming all versions have two parts)
if (accParts[1] < currParts[1]) {
return curr;
} else {
return acc;
}
});
}
}
Loading

0 comments on commit ca47e6b

Please sign in to comment.