Skip to content

Commit

Permalink
Force upgrade v1 batches to v3 (#1676)
Browse files Browse the repository at this point in the history
* Force upgrade v1 batches to v3

* Actually lets process v1 batches async
  • Loading branch information
ericallam authored Feb 6, 2025
1 parent dcf1ab6 commit 0b555fa
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 30 deletions.
44 changes: 36 additions & 8 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { BatchTriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { BatchTriggerTaskRequestBody, BatchTriggerTaskV2RequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { fromZodError } from "zod-validation-error";
import { MAX_BATCH_TRIGGER_ITEMS } from "~/consts";
import { env } from "~/env.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { BatchTriggerTaskService } from "~/v3/services/batchTriggerTask.server";
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { env } from "~/env.server";
import { fromZodError } from "zod-validation-error";

const ParamsSchema = z.object({
taskId: z.string(),
Expand Down Expand Up @@ -85,15 +85,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
);
}

const service = new BatchTriggerTaskService();
const service = new BatchTriggerV3Service();

const traceContext =
traceparent && isFromWorker // If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

const v3Body = convertV1BodyToV2Body(body.data, taskId);

try {
const result = await service.call(taskId, authenticationResult.environment, body.data, {
const result = await service.call(authenticationResult.environment, v3Body, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
Expand All @@ -106,8 +108,8 @@ export async function action({ request, params }: ActionFunctionArgs) {

return json(
{
batchId: result.batch.friendlyId,
runs: result.runs,
batchId: result.id,
runs: result.runs.map((run) => run.id),
},
{
headers: {
Expand All @@ -126,3 +128,29 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: "Something went wrong" }, { status: 500 });
}
}

// Strip from options:
// - dependentBatch
// - dependentAttempt
// - parentBatch
function convertV1BodyToV2Body(
body: BatchTriggerTaskRequestBody,
taskIdentifier: string
): BatchTriggerTaskV2RequestBody {
return {
items: body.items.map((item) => ({
task: taskIdentifier,
payload: item.payload,
context: item.context,
options: item.options
? {
...item.options,
dependentBatch: undefined,
parentBatch: undefined,
dependentAttempt: undefined,
}
: undefined,
})),
dependentAttempt: body.dependentAttempt,
};
}
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ type RunItemData = {
*/
export class BatchTriggerV3Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;
private _asyncBatchProcessSizeThreshold: number;

constructor(
batchProcessingStrategy?: BatchProcessingStrategy,
asyncBatchProcessSizeThreshold: number = ASYNC_BATCH_PROCESS_SIZE_THRESHOLD,
protected readonly _prisma: PrismaClientOrTransaction = prisma
) {
super(_prisma);

this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
this._asyncBatchProcessSizeThreshold = asyncBatchProcessSizeThreshold;
}

public async call(
Expand Down Expand Up @@ -403,7 +406,7 @@ export class BatchTriggerV3Service extends BaseService {
options: BatchTriggerTaskServiceOptions = {},
dependentAttempt?: TaskRunAttempt
) {
if (runs.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) {
if (runs.length <= this._asyncBatchProcessSizeThreshold) {
const batch = await this._prisma.batchTaskRun.create({
data: {
friendlyId: batchId,
Expand Down
42 changes: 21 additions & 21 deletions packages/core/src/v3/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,25 @@ export const TriggerTaskRequestBody = z.object({
context: z.any(),
options: z
.object({
concurrencyKey: z.string().optional(),
delay: z.string().or(z.coerce.date()).optional(),
dependentAttempt: z.string().optional(),
parentAttempt: z.string().optional(),
dependentBatch: z.string().optional(),
parentBatch: z.string().optional(),
lockToVersion: z.string().optional(),
queue: QueueOptions.optional(),
concurrencyKey: z.string().optional(),
idempotencyKey: z.string().optional(),
idempotencyKeyTTL: z.string().optional(),
test: z.boolean().optional(),
payloadType: z.string().optional(),
delay: z.string().or(z.coerce.date()).optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
tags: RunTags.optional(),
lockToVersion: z.string().optional(),
machine: MachinePresetName.optional(),
maxAttempts: z.number().int().optional(),
maxDuration: z.number().optional(),
metadata: z.any(),
metadataType: z.string().optional(),
maxDuration: z.number().optional(),
machine: MachinePresetName.optional(),
parentAttempt: z.string().optional(),
parentBatch: z.string().optional(),
payloadType: z.string().optional(),
queue: QueueOptions.optional(),
tags: RunTags.optional(),
test: z.boolean().optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
})
.optional(),
});
Expand All @@ -118,22 +118,22 @@ export const BatchTriggerTaskItem = z.object({
context: z.any(),
options: z
.object({
lockToVersion: z.string().optional(),
queue: QueueOptions.optional(),
concurrencyKey: z.string().optional(),
delay: z.string().or(z.coerce.date()).optional(),
idempotencyKey: z.string().optional(),
idempotencyKeyTTL: z.string().optional(),
test: z.boolean().optional(),
payloadType: z.string().optional(),
delay: z.string().or(z.coerce.date()).optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
tags: RunTags.optional(),
lockToVersion: z.string().optional(),
machine: MachinePresetName.optional(),
maxAttempts: z.number().int().optional(),
maxDuration: z.number().optional(),
metadata: z.any(),
metadataType: z.string().optional(),
maxDuration: z.number().optional(),
parentAttempt: z.string().optional(),
machine: MachinePresetName.optional(),
payloadType: z.string().optional(),
queue: QueueOptions.optional(),
tags: RunTags.optional(),
test: z.boolean().optional(),
ttl: z.string().or(z.number().nonnegative().int()).optional(),
})
.optional(),
});
Expand Down

0 comments on commit 0b555fa

Please sign in to comment.