Skip to content

Realtime and task run performance improvements #2158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0ffd481
Add createdAt filter to realtime subscribing with tags
ericallam Jun 5, 2025
d39db91
Filter realtime colums and expose ability to skip some columns
ericallam Jun 5, 2025
f4a7369
Add sharding support for electric
ericallam Jun 5, 2025
089c935
Use unkey cache for the created at filter caching
ericallam Jun 5, 2025
234af54
Remove 2 unused indexes on TaskRun
ericallam Jun 5, 2025
59e6783
Run list now filters by a single runtime environment
ericallam Jun 6, 2025
211d116
Remove project ID indexes
ericallam Jun 9, 2025
d489709
Use clickhouse in task list aggregation queries instead of pg (keep p…
ericallam Jun 9, 2025
30813f1
WIP clickhouse powered runs list
ericallam May 23, 2025
be70f25
Improve the query to get the latest tasks for the task list presenter
ericallam Jun 9, 2025
f3e2643
Update the usage task list to use clickhouse
ericallam Jun 9, 2025
07967bd
Implement next runs list powered by clickhouse
ericallam Jun 9, 2025
a86417c
Add new index for TaskRun for the runs list, by environment ID
ericallam Jun 9, 2025
d4c6d06
Add runTags gin index
ericallam Jun 9, 2025
532067d
Handle possibly malicious inputs
ericallam Jun 10, 2025
a3c7ea3
Ignore claude settings
ericallam Jun 10, 2025
47d43c4
Better handling not finding an environment on the schedule page
ericallam Jun 10, 2025
7666478
Use ms since epoch in test, not seconds
ericallam Jun 10, 2025
0a98b65
Remove unused function
ericallam Jun 10, 2025
eb957a6
Fix test
ericallam Jun 10, 2025
59f99cf
Use an env var for the realtime maximum createdAt filter duration (de…
ericallam Jun 10, 2025
b080384
Fixed the query builder to correct the group by / order by order
ericallam Jun 10, 2025
25e6267
Make sure runs.list still works
ericallam Jun 10, 2025
39e97de
Create small-birds-arrive.md
ericallam Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/small-birds-arrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@trigger.dev/react-hooks": patch
---

Added the ability to specify a "createdAt" filter when subscribing to tags in our useRealtime hooks:

```tsx
// Only subscribe to runs created in the last 10 hours
useRealtimeRunWithTags("my-tag", { createdAt: "10h" })
```

You can also now choose to skip subscribing to specific columns by specifying the `skipColumns` option:

```tsx
useRealtimeRun(run.id, { skipColumns: ["usageDurationMs"] });
```
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ apps/**/public/build
/packages/core/src/package.json
/packages/trigger-sdk/src/package.json
/packages/python/src/package.json
.claude
2 changes: 1 addition & 1 deletion apps/webapp/app/components/runs/v3/TaskRunsTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ function BlankState({ isLoading, filters }: Pick<RunsTableProps, "isLoading" | "
const environment = useEnvironment();
if (isLoading) return <TableBlankRow colSpan={15}></TableBlankRow>;

const { environments, tasks, from, to, ...otherFilters } = filters;
const { tasks, from, to, ...otherFilters } = filters;

if (
filters.tasks.length === 1 &&
Expand Down
16 changes: 16 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const EnvironmentSchema = z.object({
API_ORIGIN: z.string().optional(),
STREAM_ORIGIN: z.string().optional(),
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
// A comma separated list of electric origins to shard into different electric instances by environmentId
// example: "http://localhost:3060,http://localhost:3061,http://localhost:3062"
ELECTRIC_ORIGIN_SHARDS: z.string().optional(),
APP_ENV: z.string().default(process.env.NODE_ENV),
SERVICE_NAME: z.string().default("trigger.dev webapp"),
POSTHOG_PROJECT_KEY: z.string().default("phc_LFH7kJiGhdIlnO22hTAKgHpaKhpM8gkzWAFvHmf5vfS"),
Expand Down Expand Up @@ -161,6 +164,11 @@ const EnvironmentSchema = z.object({
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
.number()
.int()
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds

PUBSUB_REDIS_HOST: z
.string()
.optional()
Expand Down Expand Up @@ -738,6 +746,14 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),

// Clickhouse
CLICKHOUSE_URL: z.string().optional(),
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),

// Bootstrap
TRIGGER_BOOTSTRAP_ENABLED: z.string().default("0"),
TRIGGER_BOOTSTRAP_WORKER_GROUP_NAME: z.string().optional(),
Expand Down
33 changes: 33 additions & 0 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,36 @@ export function displayableEnvironment(
userName,
};
}

export async function findDisplayableEnvironment(
environmentId: string,
userId: string | undefined
) {
const environment = await prisma.runtimeEnvironment.findFirst({
where: {
id: environmentId,
},
select: {
id: true,
type: true,
slug: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
});

if (!environment) {
return;
}

return displayableEnvironment(environment, userId);
}
4 changes: 2 additions & 2 deletions apps/webapp/app/models/task.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";
* It has indexes for fast performance.
* It does NOT care about versions, so includes all tasks ever created.
*/
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, projectId: string) {
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, environmentId: string) {
return prisma.$queryRaw<
{
slug: string;
Expand All @@ -16,6 +16,6 @@ export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, project
>`
SELECT DISTINCT(slug), "triggerSource"
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
WHERE "projectId" = ${projectId}
WHERE "runtimeEnvironmentId" = ${environmentId}
ORDER BY slug ASC;`;
}
15 changes: 11 additions & 4 deletions apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { CoercedDate } from "~/utils/zod";
import { ApiRetrieveRunPresenter } from "./ApiRetrieveRunPresenter.server";
import { type RunListOptions, RunListPresenter } from "./RunListPresenter.server";
import { BasePresenter } from "./basePresenter.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";

export const ApiRunListSearchParams = z.object({
"page[size]": z.coerce.number().int().positive().min(1).max(100).optional(),
Expand Down Expand Up @@ -134,9 +135,11 @@ export class ApiRunListPresenter extends BasePresenter {
options.direction = "backward";
}

let environmentId: string | undefined;

// filters
if (environment) {
options.environments = [environment.id];
environmentId = environment.id;
} else {
if (searchParams["filter[env]"]) {
const environments = await this._prisma.runtimeEnvironment.findMany({
Expand All @@ -148,10 +151,14 @@ export class ApiRunListPresenter extends BasePresenter {
},
});

options.environments = environments.map((env) => env.id);
environmentId = environments.at(0)?.id;
}
}

if (!environmentId) {
throw new ServiceValidationError("No environment found");
}

if (searchParams["filter[status]"]) {
options.statuses = searchParams["filter[status]"].flatMap((status) =>
ApiRunListPresenter.apiStatusToRunStatuses(status)
Expand Down Expand Up @@ -202,9 +209,9 @@ export class ApiRunListPresenter extends BasePresenter {

logger.debug("Calling RunListPresenter", { options });

const results = await presenter.call(options);
const results = await presenter.call(environmentId, options);

logger.debug("RunListPresenter results", { results });
logger.debug("RunListPresenter results", { runs: results.runs.length });

const data: ListRunResponseItem[] = await Promise.all(
results.runs.map(async (run) => {
Expand Down
Loading