Skip to content

Commit

Permalink
v3: batch trigger (#908)
Browse files Browse the repository at this point in the history
* Support for batch triggers

* Various trace fixes and improvements

* Moved trace into the logger, fixed span cardinality issues with waits and triggers

* Add support for attaching a debugger in dev with —debugger flag
  • Loading branch information
ericallam authored Feb 26, 2024
1 parent 07efef4 commit 90a302f
Show file tree
Hide file tree
Showing 40 changed files with 1,262 additions and 180 deletions.
8 changes: 8 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
"command": "pnpm exec trigger.dev dev",
"cwd": "${workspaceFolder}/references/v3-catalog",
"sourceMaps": true
},
{
"type": "node",
"request": "attach",
"name": "Attach to Trigger.dev CLI (v3)",
"port": 9229,
"restart": true,
"skipFiles": ["<node_internals>/**"]
}
]
}
1 change: 1 addition & 0 deletions apps/webapp/app/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export const MAX_RUN_YIELDED_EXECUTIONS = 100;
export const RUN_CHUNK_EXECUTION_BUFFER = 350;
export const MAX_RUN_CHUNK_EXECUTION_LIMIT = 120000; // 2 minutes
export const VERCEL_RESPONSE_TIMEOUT_STATUS_CODES = [408, 504];
export const MAX_BATCH_TRIGGER_ITEMS = 100;
21 changes: 19 additions & 2 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Attributes } from "@opentelemetry/api";
import { TaskEventStyle } from "@trigger.dev/core/v3";
import { SemanticInternalAttributes, TaskEventStyle } from "@trigger.dev/core/v3";
import { unflattenAttributes } from "@trigger.dev/core/v3";
import { z } from "zod";
import { PrismaClient, prisma, Prisma } from "~/db.server";
Expand Down Expand Up @@ -83,14 +83,19 @@ export class SpanPresenter {
properties: unflattenAttributes(e.properties as Attributes),
}))
: undefined;
console.log("eventsUnflattened", eventsUnflattened);

const events = OtelSpanEvents.parse(eventsUnflattened);

const payload = unflattenAttributes(
filteredAttributes(event.properties as Attributes, SemanticInternalAttributes.PAYLOAD)
)[SemanticInternalAttributes.PAYLOAD];

return {
event: {
...event,
events,
output: isEmptyJson(event.output) ? null : JSON.stringify(event.output, null, 2),
payload: payload ? JSON.stringify(payload, null, 2) : undefined,
properties: sanitizedAttributesStringified(event.properties),
style,
duration: Number(event.duration),
Expand All @@ -99,6 +104,18 @@ export class SpanPresenter {
}
}

function filteredAttributes(attributes: Attributes, prefix: string): Attributes {
const result: Attributes = {};

for (const [key, value] of Object.entries(attributes)) {
if (key.startsWith(prefix)) {
result[key] = value;
}
}

return result;
}

function sanitizedAttributesStringified(json: Prisma.JsonValue): string | undefined {
const sanitizedAttributesValue = sanitizedAttributes(json);
if (!sanitizedAttributesValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ export default function Page() {

{event.events !== undefined && <SpanEvents spanEvents={event.events} />}

{event.payload && (
<div>
<Header2 spacing>Payload</Header2>
<CodeBlock code={event.payload} maxLines={20} />
</div>
)}

{event.output !== null && (
<div>
<Header2 spacing>Output</Header2>
Expand Down
109 changes: 109 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import {
parseBatchTriggerTaskRequestBody,
parseTriggerTaskRequestBody,
} from "@trigger.dev/core/v3";
import { z } from "zod";
import { MAX_BATCH_TRIGGER_ITEMS } from "~/consts";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { BatchTriggerTaskService } from "~/v3/services/batchTriggerTask.server";
import { TriggerTaskService } from "~/v3/services/triggerTask.server";

const ParamsSchema = z.object({
taskId: z.string(),
});

const HeadersSchema = z.object({
"idempotency-key": z.string().optional().nullable(),
"trigger-version": z.string().optional().nullable(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const rawHeaders = Object.fromEntries(request.headers);

const headers = HeadersSchema.safeParse(rawHeaders);

if (!headers.success) {
return json({ error: "Invalid headers" }, { status: 400 });
}

const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
traceparent,
tracestate,
} = headers.data;

const { taskId } = ParamsSchema.parse(params);

// Now parse the request body
const anyBody = await request.json();

const body = parseBatchTriggerTaskRequestBody(anyBody);

if (!body.success) {
return json({ error: "Invalid request body" }, { status: 400 });
}

logger.debug("Triggering batch", {
taskId,
idempotencyKey,
triggerVersion,
body: body.data,
});

if (!body.data.items.length) {
return json({ error: "No items to trigger" }, { status: 400 });
}

// Check the there are fewer than 100 items
if (body.data.items.length > MAX_BATCH_TRIGGER_ITEMS) {
return json(
{
error: `Too many items. Maximum allowed batch size is ${MAX_BATCH_TRIGGER_ITEMS}.`,
},
{ status: 400 }
);
}

const service = new BatchTriggerTaskService();

try {
const result = await service.call(taskId, authenticationResult.environment, body.data, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext: traceparent ? { traceparent, tracestate } : undefined,
});

if (!result) {
return json({ error: "Task not found" }, { status: 404 });
}

return json({
batchId: result.batch.friendlyId,
runs: result.runs,
});
} catch (error) {
if (error instanceof Error) {
return json({ error: error.message }, { status: 400 });
}

return json({ error: "Something went wrong" }, { status: 500 });
}
}
40 changes: 40 additions & 0 deletions apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { LoaderFunctionArgs, redirect } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";

const ParamsSchema = z.object({
projectRef: z.string(),
runParam: z.string(),
});

export async function loader({ params, request }: LoaderFunctionArgs) {
const userId = await requireUserId(request);

const validatedParams = ParamsSchema.parse(params);

const project = await prisma.project.findFirst({
where: {
externalRef: validatedParams.projectRef,
organization: {
members: {
some: {
userId,
},
},
},
},
include: {
organization: true,
},
});

if (!project) {
return new Response("Not found", { status: 404 });
}

// Redirect to the project's runs page
return redirect(
`/orgs/${project.organization.slug}/projects/v3/${project.slug}/runs/${validatedParams.runParam}`
);
}
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type CreatableEvent = Omit<
properties: Attributes;
metadata: Attributes | undefined;
style: Attributes | undefined;
output: Attributes | undefined;
output: Attributes | string | boolean | number | undefined;
};

export type CreatableEventKind = TaskEventKind;
Expand Down
73 changes: 58 additions & 15 deletions apps/webapp/app/v3/otlpExporter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
type CreatableEvent,
CreatableEventEnvironmentType,
} from "./eventRepository.server";
import { logger } from "~/services/logger.server";
import { env } from "~/env.server";

export type OTLPExporterConfig = {
batchSize: number;
Expand All @@ -41,6 +43,8 @@ class OTLPExporter {
return convertSpansToCreateableEvents(resourceSpan);
});

this.#logEventsVerbose(events);

this._eventRepository.insertMany(events);

return ExportTraceServiceResponse.create();
Expand All @@ -51,11 +55,21 @@ class OTLPExporter {
return convertLogsToCreateableEvents(resourceLog);
});

this.#logEventsVerbose(events);

this._eventRepository.insertMany(events);

return ExportLogsServiceResponse.create();
}

#logEventsVerbose(events: CreatableEvent[]) {
if (!this._verbose) return;

events.forEach((event) => {
logger.debug("Exporting event", { event });
});
}

#filterResourceSpans(
resourceSpans: ExportTraceServiceRequest["resourceSpans"]
): ExportTraceServiceRequest["resourceSpans"] {
Expand Down Expand Up @@ -120,17 +134,26 @@ function convertLogsToCreateableEvents(resourceLog: ResourceLogs): Array<Creatab
pickAttributes(log.attributes ?? [], SemanticInternalAttributes.STYLE),
[]
),
output: convertKeyValueItemsToMap(
pickAttributes(log.attributes ?? [], SemanticInternalAttributes.OUTPUT),
[]
output: detectPrimitiveValue(
convertKeyValueItemsToMap(
pickAttributes(log.attributes ?? [], SemanticInternalAttributes.OUTPUT),
[]
),
SemanticInternalAttributes.OUTPUT
),
...resourceProperties,
attemptId:
extractStringAttribute(log.attributes ?? [], SemanticInternalAttributes.ATTEMPT_ID) ??
resourceProperties.attemptId,
extractStringAttribute(
log.attributes ?? [],
[SemanticInternalAttributes.METADATA, SemanticInternalAttributes.ATTEMPT_ID].join(".")
) ?? resourceProperties.attemptId,
attemptNumber:
extractNumberAttribute(log.attributes ?? [], SemanticInternalAttributes.ATTEMPT_NUMBER) ??
resourceProperties.attemptNumber,
extractNumberAttribute(
log.attributes ?? [],
[SemanticInternalAttributes.METADATA, SemanticInternalAttributes.ATTEMPT_NUMBER].join(
"."
)
) ?? resourceProperties.attemptNumber,
};
});
});
Expand Down Expand Up @@ -180,18 +203,25 @@ function convertSpansToCreateableEvents(resourceSpan: ResourceSpans): Array<Crea
pickAttributes(span.attributes ?? [], SemanticInternalAttributes.STYLE),
[]
),
output: convertKeyValueItemsToMap(
pickAttributes(span.attributes ?? [], SemanticInternalAttributes.OUTPUT),
[]
output: detectPrimitiveValue(
convertKeyValueItemsToMap(
pickAttributes(span.attributes ?? [], SemanticInternalAttributes.OUTPUT),
[]
),
SemanticInternalAttributes.OUTPUT
),
...resourceProperties,
attemptId:
extractStringAttribute(span.attributes ?? [], SemanticInternalAttributes.ATTEMPT_ID) ??
resourceProperties.attemptId,
extractStringAttribute(
span.attributes ?? [],
[SemanticInternalAttributes.METADATA, SemanticInternalAttributes.ATTEMPT_ID].join(".")
) ?? resourceProperties.attemptId,
attemptNumber:
extractNumberAttribute(
span.attributes ?? [],
SemanticInternalAttributes.ATTEMPT_NUMBER
[SemanticInternalAttributes.METADATA, SemanticInternalAttributes.ATTEMPT_NUMBER].join(
"."
)
) ?? resourceProperties.attemptNumber,
};
});
Expand Down Expand Up @@ -261,7 +291,7 @@ function convertKeyValueItemsToMap(
filteredKeys: string[] = [],
prefix?: string
): Record<string, string | number | boolean | undefined> {
return attributes.reduce(
const result = attributes.reduce(
(map: Record<string, string | number | boolean | undefined>, attribute) => {
if (filteredKeys.includes(attribute.key)) return map;

Expand All @@ -281,6 +311,19 @@ function convertKeyValueItemsToMap(
},
{}
);

return result;
}

function detectPrimitiveValue(
attributes: Record<string, string | number | boolean | undefined>,
sentinel: string
): Record<string, string | number | boolean | undefined> | string | number | boolean | undefined {
if (typeof attributes[sentinel] !== "undefined") {
return attributes[sentinel];
}

return attributes;
}

function spanLinksToEventLinks(links: Span_Link[]): CreatableEvent["links"] {
Expand Down Expand Up @@ -523,4 +566,4 @@ function binaryToHex(buffer: Buffer | undefined): string | undefined {
return Buffer.from(Array.from(buffer)).toString("hex");
}

export const otlpExporter = new OTLPExporter(eventRepository, true);
export const otlpExporter = new OTLPExporter(eventRepository, false);
Loading

0 comments on commit 90a302f

Please sign in to comment.