Skip to content

Run Engine 2.0 trigger idempotency #1613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f1d5481
Return isCached from the trigger API endpoint
matt-aitken Jan 3, 2025
8642497
Fix for the wrong type when blocking a run
matt-aitken Jan 3, 2025
8d882d8
Render the idempotent run in the inspector
matt-aitken Jan 3, 2025
67c9085
Event repository for idempotency
matt-aitken Jan 3, 2025
0e958e7
Debug events off by default, added an admin toggle to show them
matt-aitken Jan 3, 2025
c175ed9
triggerAndWait idempotency span
matt-aitken Jan 3, 2025
cf83a53
Some improvements to the reference idempotency task
matt-aitken Jan 3, 2025
3064c2a
Merge remote-tracking branch 'origin/run-engine-2' into engine-2-idem…
matt-aitken Jan 3, 2025
d3043da
Removed the cached tracing from the SDK
matt-aitken Jan 6, 2025
1149c6e
Server-side creating cached span
matt-aitken Jan 6, 2025
4f9344a
Improved idempotency test task
matt-aitken Jan 6, 2025
59b30eb
Create cached task spans in a better way
matt-aitken Jan 6, 2025
93c3d23
Idempotency span support inc batch trigger
matt-aitken Jan 7, 2025
fdfd064
Simplified how the spans are done, using more of the existing code
matt-aitken Jan 7, 2025
8a8aaac
Improved the idempotency test task
matt-aitken Jan 9, 2025
2d4c67c
Added Waitpoint Batch type, add to TaskRunWaitpoint with order
matt-aitken Jan 9, 2025
4e9f8ca
Pass batch ids through to the run engine when triggering
matt-aitken Jan 9, 2025
bf6946a
Added batchIndex
matt-aitken Jan 9, 2025
7038f4c
Better batch support in the run engine
matt-aitken Jan 9, 2025
041cd87
Added settings to batch trigger service, before major overhaul
matt-aitken Jan 10, 2025
641edd2
Allow the longer run/batch ids in the filters
matt-aitken Jan 13, 2025
151a50a
Changed how batching works, includes breaking changes in CLI
matt-aitken Jan 13, 2025
efc83c9
Removed batch idempotency because it gets put on the runs instead
matt-aitken Jan 13, 2025
1230871
Added `runs` to the batch.retrieve call/API
matt-aitken Jan 13, 2025
5f0d45a
Set firstAttemptStartedAt when creating the first attempt
matt-aitken Jan 14, 2025
509438d
Do nothing when receiving a BATCH waitpoint
matt-aitken Jan 14, 2025
7ec2726
Some fixes in the new batch trigger service… mostly just passing miss…
matt-aitken Jan 14, 2025
5dfa932
Tweaked the idempotency test task for more situations
matt-aitken Jan 14, 2025
bd6ee2c
Only block with a batch if it’s a batchTriggerAndWait… 🤦‍♂️
matt-aitken Jan 14, 2025
b73ca8b
Added another case to the idempotency test task: multiple of the same…
matt-aitken Jan 14, 2025
6d0927e
Support for the same run multiple times in the same batch
matt-aitken Jan 14, 2025
5ac9673
Small tweaks
matt-aitken Jan 14, 2025
84ee4d3
Make sure to complete batches, even if they’re not andWait ones
matt-aitken Jan 14, 2025
67060db
Merge remote-tracking branch 'origin/run-engine-2' into engine-2-idem…
matt-aitken Jan 15, 2025
9bf52cc
Export RunDuplicateIdempotencyKeyError from the run engine
matt-aitken Jan 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Changed how batching works, includes breaking changes in CLI
  • Loading branch information
matt-aitken committed Jan 13, 2025
commit 151a50a19de9bec9f3ca710cf54c051d4972b6dc
12 changes: 1 addition & 11 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { determineEngineVersion } from "~/v3/engineVersion.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

Expand Down Expand Up @@ -88,15 +86,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const version = await determineEngineVersion({
environment: authentication.environment,
version: engineVersion ?? undefined,
});

const service =
version === "V1"
? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined)
: new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
168 changes: 168 additions & 0 deletions apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { json } from "@remix-run/server-runtime";
import {
BatchTriggerTaskV2RequestBody,
BatchTriggerTaskV2Response,
BatchTriggerTaskV3RequestBody,
BatchTriggerTaskV3Response,
generateJWT,
} from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import {
BatchProcessingStrategy,
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema.extend({
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
}),
body: BatchTriggerTaskV3RequestBody,
allowJWT: true,
maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE,
authorization: {
action: "batchTrigger",
resource: (_, __, ___, body) => ({
tasks: Array.from(new Set(body.items.map((i) => i.task))),
}),
superScopes: ["write:tasks", "admin"],
},
corsStrategy: "all",
},
async ({ body, headers, params, authentication }) => {
if (!body.items.length) {
return json({ error: "Batch cannot be triggered with no items" }, { status: 400 });
}

// Check the there are fewer than MAX_BATCH_V2_TRIGGER_ITEMS items
if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) {
return json(
{
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`,
},
{ status: 400 }
);
}

const {
"idempotency-key": idempotencyKey,
"idempotency-key-ttl": idempotencyKeyTTL,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
traceparent,
tracestate,
} = headers;

const oneTimeUseToken = await getOneTimeUseToken(authentication);

logger.debug("Batch trigger request", {
idempotencyKey,
idempotencyKeyTTL,
triggerVersion,
spanParentAsLink,
isFromWorker,
triggerClient,
traceparent,
tracestate,
batchProcessingStrategy,
});

const traceContext =
traceparent && isFromWorker // If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

// By default, the idempotency key expires in 30 days
const idempotencyKeyExpiresAt =
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
idempotencyKey: idempotencyKey ?? undefined,
idempotencyKeyExpiresAt,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
oneTimeUseToken,
});

const $responseHeaders = await responseHeaders(
batch,
authentication.environment,
triggerClient
);

return json(batch, { status: 202, headers: $responseHeaders });
} catch (error) {
logger.error("Batch trigger error", {
error: {
message: (error as Error).message,
stack: (error as Error).stack,
},
});

if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json(
{ error: error.message },
{ status: 500, headers: { "x-should-retry": "false" } }
);
}

return json({ error: "Something went wrong" }, { status: 500 });
}
}
);

async function responseHeaders(
batch: BatchTriggerTaskV3Response,
environment: AuthenticatedEnvironment,
triggerClient?: string | null
): Promise<Record<string, string>> {
const claimsHeader = JSON.stringify({
sub: environment.id,
pub: true,
});

if (triggerClient === "browser") {
const claims = {
sub: environment.id,
pub: true,
scopes: [`read:batch:${batch.id}`],
};

const jwt = await generateJWT({
secretKey: environment.apiKey,
payload: claims,
expirationTime: "1h",
});

return {
"x-trigger-jwt-claims": claimsHeader,
"x-trigger-jwt": jwt,
};
}

return {
"x-trigger-jwt-claims": claimsHeader,
};
}

export { action, loader };
Loading