Skip to content

Commit

Permalink
Auto-resolve payload/output presigned urls when retrieving a run with…
Browse files Browse the repository at this point in the history
… runs.retrieve (#1317)
  • Loading branch information
ericallam authored Sep 18, 2024
1 parent b5cdb0c commit 4adc773
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .changeset/polite-tables-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Auto-resolve payload/output presigned urls when retrieving a run with runs.retrieve
25 changes: 25 additions & 0 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ export async function conditionallyImportPacket(
}
}

export async function resolvePresignedPacketUrl(
url: string,
tracer?: TriggerTracer
): Promise<any | undefined> {
try {
const response = await fetch(url);

if (!response.ok) {
return;
}

const data = await response.text();
const dataType = response.headers.get("content-type") ?? "application/json";

const packet = {
data,
dataType,
};

return await parsePacket(packet);
} catch (error) {
return;
}
}

async function importPacket(packet: IOPacket, span?: Span): Promise<IOPacket> {
if (!packet.data) {
return packet;
Expand Down
32 changes: 26 additions & 6 deletions packages/trigger-sdk/src/v3/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {
ListProjectRunsQueryParams,
ListRunsQueryParams,
RescheduleRunRequestBody,
TriggerTracer,
} from "@trigger.dev/core/v3";
import {
ApiPromise,
Expand All @@ -19,6 +20,7 @@ import {
} from "@trigger.dev/core/v3";
import { AnyTask, Prettify, RunHandle, Task, apiClientMissingError } from "./shared.js";
import { tracer } from "./tracer.js";
import { resolvePresignedPacketUrl } from "@trigger.dev/core/v3/utils/ioSerialization";

export type RetrieveRunResult<TRunId> = Prettify<
TRunId extends RunHandle<infer TOutput>
Expand Down Expand Up @@ -183,13 +185,31 @@ function retrieveRun<TRunId extends RunHandle<any> | AnyTask | string>(
requestOptions
);

if (typeof runId === "string") {
return apiClient.retrieveRun(runId, $requestOptions) as ApiPromise<RetrieveRunResult<TRunId>>;
} else {
return apiClient.retrieveRun(runId.id, $requestOptions) as ApiPromise<
RetrieveRunResult<TRunId>
>;
const $runId = typeof runId === "string" ? runId : runId.id;

return apiClient.retrieveRun($runId, $requestOptions).then((retrievedRun) => {
return resolvePayloadAndOutputUrls(retrievedRun);
}) as ApiPromise<RetrieveRunResult<TRunId>>;
}

async function resolvePayloadAndOutputUrls(run: RetrieveRunResult<any>) {
const resolvedRun = { ...run };

if (run.payloadPresignedUrl && run.outputPresignedUrl) {
const [payload, output] = await Promise.all([
resolvePresignedPacketUrl(run.payloadPresignedUrl, tracer),
resolvePresignedPacketUrl(run.outputPresignedUrl, tracer),
]);

resolvedRun.payload = payload;
resolvedRun.output = output;
} else if (run.payloadPresignedUrl) {
resolvedRun.payload = await resolvePresignedPacketUrl(run.payloadPresignedUrl, tracer);
} else if (run.outputPresignedUrl) {
resolvedRun.output = await resolvePresignedPacketUrl(run.outputPresignedUrl, tracer);
}

return resolvedRun;
}

function replayRun(
Expand Down
43 changes: 42 additions & 1 deletion references/v3-catalog/src/trigger/sdkUsage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,51 @@ export const sdkUsage = task({

export const sdkChild = task({
id: "sdk-child",
run: async (payload: any) => {},
run: async (payload: any) => {
return payload;
},
});

export const sdkSchedule = schedules.task({
id: "sdk-schedule",
run: async (payload: any) => {},
});

export const autoResolvePayloadAndOutput = task({
id: "auto-resolve-payload-and-output",
run: async (payload: any, { ctx }) => {
// Generate a large JSON payload (bigger than 128KB)
const childPayload = Array.from({ length: 10000 }, () => ({
key: "value",
date: new Date(),
}));

const handle = await tasks.trigger<typeof sdkChild>("sdk-child", childPayload);

const childRun = await runs.retrieve(handle.id);

if (childRun.payload) {
console.log("Child run payload exists", {
payloadPresignedUrl: childRun.payloadPresignedUrl,
});
} else {
console.log("Child run payload does not exist", {
payloadPresignedUrl: childRun.payloadPresignedUrl,
});
}

await runs.poll(handle.id);

const finishedRun = await runs.retrieve(handle.id);

if (finishedRun.output) {
console.log("Finished run output exists", {
outputPresignedUrl: finishedRun.outputPresignedUrl,
});
} else {
console.log("Finished run payload does not exist", {
outputPresignedUrl: finishedRun.outputPresignedUrl,
});
}
},
});

0 comments on commit 4adc773

Please sign in to comment.