@@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3";
9
9
import {
10
10
DockerResourceMonitor ,
11
11
KubernetesResourceMonitor ,
12
+ NoopResourceMonitor ,
12
13
type ResourceMonitor ,
13
14
} from "./resourceMonitor.js" ;
14
15
import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js" ;
@@ -33,7 +34,7 @@ class ManagedSupervisor {
33
34
private readonly metricsServer ?: HttpServer ;
34
35
private readonly workloadServer : WorkloadServer ;
35
36
private readonly workloadManager : WorkloadManager ;
36
- private readonly logger = new SimpleStructuredLogger ( "managed-worker " ) ;
37
+ private readonly logger = new SimpleStructuredLogger ( "managed-supervisor " ) ;
37
38
private readonly resourceMonitor : ResourceMonitor ;
38
39
private readonly checkpointClient ?: CheckpointClient ;
39
40
@@ -47,11 +48,11 @@ class ManagedSupervisor {
47
48
const { TRIGGER_WORKER_TOKEN , MANAGED_WORKER_SECRET , ...envWithoutSecrets } = env ;
48
49
49
50
if ( env . DEBUG ) {
50
- console . debug ( "[ManagedSupervisor] Starting up" , { envWithoutSecrets } ) ;
51
+ this . logger . debug ( "Starting up" , { envWithoutSecrets } ) ;
51
52
}
52
53
53
54
if ( this . warmStartUrl ) {
54
- this . logger . log ( "[ManagedWorker] 🔥 Warm starts enabled" , {
55
+ this . logger . log ( "🔥 Warm starts enabled" , {
55
56
warmStartUrl : this . warmStartUrl ,
56
57
} ) ;
57
58
}
@@ -69,9 +70,19 @@ class ManagedSupervisor {
69
70
dockerAutoremove : env . RUNNER_DOCKER_AUTOREMOVE ,
70
71
} satisfies WorkloadManagerOptions ;
71
72
73
+ this . resourceMonitor = env . RESOURCE_MONITOR_ENABLED
74
+ ? this . isKubernetes
75
+ ? new KubernetesResourceMonitor ( createK8sApi ( ) , env . TRIGGER_WORKER_INSTANCE_NAME )
76
+ : new DockerResourceMonitor ( new Docker ( ) )
77
+ : new NoopResourceMonitor ( ) ;
78
+
79
+ this . workloadManager = this . isKubernetes
80
+ ? new KubernetesWorkloadManager ( workloadManagerOptions )
81
+ : new DockerWorkloadManager ( workloadManagerOptions ) ;
82
+
72
83
if ( this . isKubernetes ) {
73
84
if ( env . POD_CLEANER_ENABLED ) {
74
- this . logger . log ( "[ManagedWorker] 🧹 Pod cleaner enabled" , {
85
+ this . logger . log ( "🧹 Pod cleaner enabled" , {
75
86
namespace : env . KUBERNETES_NAMESPACE ,
76
87
batchSize : env . POD_CLEANER_BATCH_SIZE ,
77
88
intervalMs : env . POD_CLEANER_INTERVAL_MS ,
@@ -83,11 +94,11 @@ class ManagedSupervisor {
83
94
intervalMs : env . POD_CLEANER_INTERVAL_MS ,
84
95
} ) ;
85
96
} else {
86
- this . logger . warn ( "[ManagedWorker] Pod cleaner disabled" ) ;
97
+ this . logger . warn ( "Pod cleaner disabled" ) ;
87
98
}
88
99
89
100
if ( env . FAILED_POD_HANDLER_ENABLED ) {
90
- this . logger . log ( "[ManagedWorker] 🔁 Failed pod handler enabled" , {
101
+ this . logger . log ( "🔁 Failed pod handler enabled" , {
91
102
namespace : env . KUBERNETES_NAMESPACE ,
92
103
reconnectIntervalMs : env . FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS ,
93
104
} ) ;
@@ -97,17 +108,14 @@ class ManagedSupervisor {
97
108
reconnectIntervalMs : env . FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS ,
98
109
} ) ;
99
110
} else {
100
- this . logger . warn ( "[ManagedWorker] Failed pod handler disabled" ) ;
111
+ this . logger . warn ( "Failed pod handler disabled" ) ;
101
112
}
113
+ }
102
114
103
- this . resourceMonitor = new KubernetesResourceMonitor (
104
- createK8sApi ( ) ,
105
- env . TRIGGER_WORKER_INSTANCE_NAME
115
+ if ( env . TRIGGER_DEQUEUE_INTERVAL_MS > env . TRIGGER_DEQUEUE_IDLE_INTERVAL_MS ) {
116
+ this . logger . warn (
117
+ `⚠️ TRIGGER_DEQUEUE_INTERVAL_MS ( ${ env . TRIGGER_DEQUEUE_INTERVAL_MS } ) is greater than TRIGGER_DEQUEUE_IDLE_INTERVAL_MS ( ${ env . TRIGGER_DEQUEUE_IDLE_INTERVAL_MS } ) - did you mix them up?`
106
118
) ;
107
- this . workloadManager = new KubernetesWorkloadManager ( workloadManagerOptions ) ;
108
- } else {
109
- this . resourceMonitor = new DockerResourceMonitor ( new Docker ( ) ) ;
110
- this . workloadManager = new DockerWorkloadManager ( workloadManagerOptions ) ;
111
119
}
112
120
113
121
this . workerSession = new SupervisorSession ( {
@@ -123,12 +131,17 @@ class ManagedSupervisor {
123
131
runNotificationsEnabled : env . TRIGGER_WORKLOAD_API_ENABLED ,
124
132
heartbeatIntervalSeconds : env . TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS ,
125
133
preDequeue : async ( ) => {
134
+ if ( ! env . RESOURCE_MONITOR_ENABLED ) {
135
+ return { } ;
136
+ }
137
+
126
138
if ( this . isKubernetes ) {
127
139
// Not used in k8s for now
128
140
return { } ;
129
141
}
130
142
131
143
const resources = await this . resourceMonitor . getNodeResources ( ) ;
144
+
132
145
return {
133
146
maxResources : {
134
147
cpu : resources . cpuAvailable ,
@@ -144,7 +157,7 @@ class ManagedSupervisor {
144
157
} ) ;
145
158
146
159
if ( env . TRIGGER_CHECKPOINT_URL ) {
147
- this . logger . log ( "[ManagedWorker] 🥶 Checkpoints enabled" , {
160
+ this . logger . log ( "🥶 Checkpoints enabled" , {
148
161
checkpointUrl : env . TRIGGER_CHECKPOINT_URL ,
149
162
} ) ;
150
163
@@ -155,43 +168,34 @@ class ManagedSupervisor {
155
168
} ) ;
156
169
}
157
170
158
- // setInterval(async () => {
159
- // const resources = await this.resourceMonitor.getNodeResources(true);
160
- // this.logger.debug("[ManagedWorker] Current resources", { resources });
161
- // }, 1000);
162
-
163
171
this . workerSession . on ( "runNotification" , async ( { time, run } ) => {
164
- this . logger . log ( "[ManagedWorker] runNotification" , { time, run } ) ;
172
+ this . logger . log ( "runNotification" , { time, run } ) ;
165
173
166
174
this . workloadServer . notifyRun ( { run } ) ;
167
175
} ) ;
168
176
169
177
this . workerSession . on ( "runQueueMessage" , async ( { time, message } ) => {
170
- this . logger . log (
171
- `[ManagedWorker] Received message with timestamp ${ time . toLocaleString ( ) } ` ,
172
- message
173
- ) ;
178
+ this . logger . log ( `Received message with timestamp ${ time . toLocaleString ( ) } ` , message ) ;
174
179
175
180
if ( message . completedWaitpoints . length > 0 ) {
176
- this . logger . debug ( "[ManagedWorker] Run has completed waitpoints" , {
181
+ this . logger . debug ( "Run has completed waitpoints" , {
177
182
runId : message . run . id ,
178
183
completedWaitpoints : message . completedWaitpoints . length ,
179
184
} ) ;
180
- // TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it
181
185
}
182
186
183
187
if ( ! message . image ) {
184
- this . logger . error ( "[ManagedWorker] Run has no image" , { runId : message . run . id } ) ;
188
+ this . logger . error ( "Run has no image" , { runId : message . run . id } ) ;
185
189
return ;
186
190
}
187
191
188
192
const { checkpoint, ...rest } = message ;
189
193
190
194
if ( checkpoint ) {
191
- this . logger . log ( "[ManagedWorker] Restoring run" , { runId : message . run . id } ) ;
195
+ this . logger . log ( "Restoring run" , { runId : message . run . id } ) ;
192
196
193
197
if ( ! this . checkpointClient ) {
194
- this . logger . error ( "[ManagedWorker] No checkpoint client" , { runId : message . run . id } ) ;
198
+ this . logger . error ( "No checkpoint client" , { runId : message . run . id } ) ;
195
199
return ;
196
200
}
197
201
@@ -206,23 +210,23 @@ class ManagedSupervisor {
206
210
} ) ;
207
211
208
212
if ( didRestore ) {
209
- this . logger . log ( "[ManagedWorker] Restore successful" , { runId : message . run . id } ) ;
213
+ this . logger . log ( "Restore successful" , { runId : message . run . id } ) ;
210
214
} else {
211
- this . logger . error ( "[ManagedWorker] Restore failed" , { runId : message . run . id } ) ;
215
+ this . logger . error ( "Restore failed" , { runId : message . run . id } ) ;
212
216
}
213
217
} catch ( error ) {
214
- this . logger . error ( "[ManagedWorker] Failed to restore run" , { error } ) ;
218
+ this . logger . error ( "Failed to restore run" , { error } ) ;
215
219
}
216
220
217
221
return ;
218
222
}
219
223
220
- this . logger . log ( "[ManagedWorker] Scheduling run" , { runId : message . run . id } ) ;
224
+ this . logger . log ( "Scheduling run" , { runId : message . run . id } ) ;
221
225
222
226
const didWarmStart = await this . tryWarmStart ( message ) ;
223
227
224
228
if ( didWarmStart ) {
225
- this . logger . log ( "[ManagedWorker] Warm start successful" , { runId : message . run . id } ) ;
229
+ this . logger . log ( "Warm start successful" , { runId : message . run . id } ) ;
226
230
return ;
227
231
}
228
232
@@ -249,7 +253,7 @@ class ManagedSupervisor {
249
253
// memory: message.run.machine.memory,
250
254
// });
251
255
} catch ( error ) {
252
- this . logger . error ( "[ManagedWorker] Failed to create workload" , { error } ) ;
256
+ this . logger . error ( "Failed to create workload" , { error } ) ;
253
257
}
254
258
} ) ;
255
259
@@ -277,12 +281,12 @@ class ManagedSupervisor {
277
281
}
278
282
279
283
async onRunConnected ( { run } : { run : { friendlyId : string } } ) {
280
- this . logger . debug ( "[ManagedWorker] Run connected" , { run } ) ;
284
+ this . logger . debug ( "Run connected" , { run } ) ;
281
285
this . workerSession . subscribeToRunNotifications ( [ run . friendlyId ] ) ;
282
286
}
283
287
284
288
async onRunDisconnected ( { run } : { run : { friendlyId : string } } ) {
285
- this . logger . debug ( "[ManagedWorker] Run disconnected" , { run } ) ;
289
+ this . logger . debug ( "Run disconnected" , { run } ) ;
286
290
this . workerSession . unsubscribeFromRunNotifications ( [ run . friendlyId ] ) ;
287
291
}
288
292
@@ -303,7 +307,7 @@ class ManagedSupervisor {
303
307
} ) ;
304
308
305
309
if ( ! res . ok ) {
306
- this . logger . error ( "[ManagedWorker] Warm start failed" , {
310
+ this . logger . error ( "Warm start failed" , {
307
311
runId : dequeuedMessage . run . id ,
308
312
} ) ;
309
313
return false ;
@@ -313,7 +317,7 @@ class ManagedSupervisor {
313
317
const parsedData = z . object ( { didWarmStart : z . boolean ( ) } ) . safeParse ( data ) ;
314
318
315
319
if ( ! parsedData . success ) {
316
- this . logger . error ( "[ManagedWorker] Warm start response invalid" , {
320
+ this . logger . error ( "Warm start response invalid" , {
317
321
runId : dequeuedMessage . run . id ,
318
322
data,
319
323
} ) ;
@@ -322,7 +326,7 @@ class ManagedSupervisor {
322
326
323
327
return parsedData . data . didWarmStart ;
324
328
} catch ( error ) {
325
- this . logger . error ( "[ManagedWorker] Warm start error" , {
329
+ this . logger . error ( "Warm start error" , {
326
330
runId : dequeuedMessage . run . id ,
327
331
error,
328
332
} ) ;
@@ -331,29 +335,29 @@ class ManagedSupervisor {
331
335
}
332
336
333
337
async start ( ) {
334
- this . logger . log ( "[ManagedWorker] Starting up" ) ;
338
+ this . logger . log ( "Starting up" ) ;
335
339
336
340
// Optional services
337
341
await this . podCleaner ?. start ( ) ;
338
342
await this . failedPodHandler ?. start ( ) ;
339
343
await this . metricsServer ?. start ( ) ;
340
344
341
345
if ( env . TRIGGER_WORKLOAD_API_ENABLED ) {
342
- this . logger . log ( "[ManagedWorker] Workload API enabled" , {
346
+ this . logger . log ( "Workload API enabled" , {
343
347
protocol : env . TRIGGER_WORKLOAD_API_PROTOCOL ,
344
348
domain : env . TRIGGER_WORKLOAD_API_DOMAIN ,
345
349
port : env . TRIGGER_WORKLOAD_API_PORT_INTERNAL ,
346
350
} ) ;
347
351
await this . workloadServer . start ( ) ;
348
352
} else {
349
- this . logger . warn ( "[ManagedWorker] Workload API disabled" ) ;
353
+ this . logger . warn ( "Workload API disabled" ) ;
350
354
}
351
355
352
356
await this . workerSession . start ( ) ;
353
357
}
354
358
355
359
async stop ( ) {
356
- this . logger . log ( "[ManagedWorker] Shutting down" ) ;
360
+ this . logger . log ( "Shutting down" ) ;
357
361
await this . workerSession . stop ( ) ;
358
362
359
363
// Optional services
0 commit comments