Skip to content

Commit

Permalink
Set taskRunAttemptId on batch task run items on completion (#1658)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam authored Feb 3, 2025
1 parent d285f79 commit 6f52b00
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 6 deletions.
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ export async function completeBatchTaskRunItemV3(
itemId: string,
batchTaskRunId: string,
tx: PrismaClientOrTransaction,
scheduleResumeOnComplete = false
scheduleResumeOnComplete = false,
taskRunAttemptId?: string
) {
await $transaction(tx, "completeBatchTaskRunItemV3", async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);
Expand All @@ -917,6 +918,7 @@ export async function completeBatchTaskRunItemV3(
},
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});

Expand Down
3 changes: 0 additions & 3 deletions apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"];

type RetrieveBatchRunResult = NonNullable<Awaited<ReturnType<typeof retrieveBatchRun>>>;

// {"batchRunId":"cm6l2qfs400d0dyiczcwiuwrp","dependentTaskAttempt":{"status":"PAUSED","id":"cm6l2qcqf00cydyicryir6xlu","taskRun":{"id":"cm6l2qaw200cudyicktkfh4k9","queue":"task/batch-trigger-sequentially","taskIdentifier":"batch-trigger-sequentially","concurrencyKey":null}},"checkpointEventId":"cm6l2qg7400dgdyicy6qx9s8u","timestamp":"2025-01-31T18:04:52.869Z","name":"webapp","message":"ResumeBatchRunService: Attempt is paused and has a checkpoint event","level":"debug","skipForwarding":true}
// {"batchRunId":"cm6l2qfs400d0dyiczcwiuwrp","dependentTaskAttempt":{"status":"PAUSED","id":"cm6l2qcqf00cydyicryir6xlu","taskRun":{"id":"cm6l2qaw200cudyicktkfh4k9","queue":"task/batch-trigger-sequentially","taskIdentifier":"batch-trigger-sequentially","concurrencyKey":null}},"checkpointEventId":"cm6l2qg7400dgdyicy6qx9s8u","hasCheckpointEvent":true,"timestamp":"2025-01-31T18:04:52.871Z","name":"webapp","message":"ResumeBatchRunService: with checkpoint was already completed","level":"debug","skipForwarding":true}

export class ResumeBatchRunService extends BaseService {
public async call(batchRunId: string) {
const batchRun = await this._prisma.batchTaskRun.findFirst({
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/services/resumeDependentParents.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ export class ResumeDependentParentsService extends BaseService {
batchTaskRunItem.id,
batchTaskRunItem.batchTaskRunId,
this._prisma,
true
true,
lastAttempt.id
);
} else {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ export class ResumeTaskRunDependenciesService extends BaseService {
taskAttempt: TaskRunAttempt
) {
if (batchTaskRun.batchVersion === "v3") {
await completeBatchTaskRunItemV3(batchItem.id, batchTaskRun.id, this._prisma, true);
await completeBatchTaskRunItemV3(
batchItem.id,
batchTaskRun.id,
this._prisma,
true,
taskAttempt.id
);
} else {
await $transaction(this._prisma, async (tx) => {
await tx.batchTaskRunItem.update({
Expand Down

0 comments on commit 6f52b00

Please sign in to comment.