@@ -33,6 +33,8 @@ public actor ServiceGroup: Sendable {
33
33
private let logger : Logger
34
34
/// The logging configuration.
35
35
private let loggingConfiguration : ServiceGroupConfiguration . LoggingConfiguration
36
+ /// The escalation configuration.
37
+ private let escalationConfiguration : ServiceGroupConfiguration . EscalationBehaviour
36
38
/// The signals that lead to graceful shutdown.
37
39
private let gracefulShutdownSignals : [ UnixSignal ]
38
40
/// The signals that lead to cancellation.
@@ -57,6 +59,7 @@ public actor ServiceGroup: Sendable {
57
59
self . cancellationSignals = configuration. cancellationSignals
58
60
self . logger = configuration. logger
59
61
self . loggingConfiguration = configuration. logging
62
+ self . escalationConfiguration = configuration. escalation
60
63
}
61
64
62
65
/// Initializes a new ``ServiceGroup``.
@@ -94,6 +97,7 @@ public actor ServiceGroup: Sendable {
94
97
self . cancellationSignals = configuration. cancellationSignals
95
98
self . logger = logger
96
99
self . loggingConfiguration = configuration. logging
100
+ self . escalationConfiguration = configuration. escalation
97
101
}
98
102
99
103
/// Runs all the services by spinning up a child task per service.
@@ -176,6 +180,8 @@ public actor ServiceGroup: Sendable {
176
180
case signalSequenceFinished
177
181
case gracefulShutdownCaught
178
182
case gracefulShutdownFinished
183
+ case gracefulShutdownTimedOut
184
+ case cancellationCaught
179
185
}
180
186
181
187
private func _run(
@@ -191,6 +197,10 @@ public actor ServiceGroup: Sendable {
191
197
]
192
198
)
193
199
200
+ // A task that is spawned when we got cancelled or
201
+ // we cancel the task group to keep track of a timeout.
202
+ var cancellationTimeoutTask : Task < Void , Never > ?
203
+
194
204
// Using a result here since we want a task group that has non-throwing child tasks
195
205
// but the body itself is throwing
196
206
let result = try await withThrowingTaskGroup ( of: ChildTaskResult . self, returning: Result< Void, Error> . self ) { group in
@@ -267,6 +277,13 @@ public actor ServiceGroup: Sendable {
267
277
}
268
278
}
269
279
280
+ group. addTask {
281
+ // This child task is waiting forever until the group gets cancelled.
282
+ let ( stream, _) = AsyncStream . makeStream ( of: Void . self)
283
+ await stream. first { _ in true }
284
+ return . cancellationCaught
285
+ }
286
+
270
287
// We are storing the services in an optional array now. When a slot in the array is
271
288
// empty it indicates that the service has been shutdown.
272
289
var services = services. map { Optional ( $0) }
@@ -293,7 +310,7 @@ public actor ServiceGroup: Sendable {
293
310
self . loggingConfiguration. keys. serviceKey: " \( service. service) " ,
294
311
]
295
312
)
296
- group . cancelAll ( )
313
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
297
314
return . failure( ServiceGroupError . serviceFinishedUnexpectedly ( ) )
298
315
299
316
case . gracefullyShutdownGroup:
@@ -307,6 +324,7 @@ public actor ServiceGroup: Sendable {
307
324
do {
308
325
try await self . shutdownGracefully (
309
326
services: services,
327
+ cancellationTimeoutTask: & cancellationTimeoutTask,
310
328
group: & group,
311
329
gracefulShutdownManagers: gracefulShutdownManagers
312
330
)
@@ -327,7 +345,7 @@ public actor ServiceGroup: Sendable {
327
345
self . logger. debug (
328
346
" All services finished. "
329
347
)
330
- group . cancelAll ( )
348
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
331
349
return . success( ( ) )
332
350
}
333
351
}
@@ -342,7 +360,7 @@ public actor ServiceGroup: Sendable {
342
360
self . loggingConfiguration. keys. errorKey: " \( serviceError) " ,
343
361
]
344
362
)
345
- group . cancelAll ( )
363
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
346
364
return . failure( serviceError)
347
365
348
366
case . gracefullyShutdownGroup:
@@ -358,6 +376,7 @@ public actor ServiceGroup: Sendable {
358
376
do {
359
377
try await self . shutdownGracefully (
360
378
services: services,
379
+ cancellationTimeoutTask: & cancellationTimeoutTask,
361
380
group: & group,
362
381
gracefulShutdownManagers: gracefulShutdownManagers
363
382
)
@@ -381,7 +400,7 @@ public actor ServiceGroup: Sendable {
381
400
" All services finished. "
382
401
)
383
402
384
- group . cancelAll ( )
403
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
385
404
return . success( ( ) )
386
405
}
387
406
}
@@ -398,6 +417,7 @@ public actor ServiceGroup: Sendable {
398
417
do {
399
418
try await self . shutdownGracefully (
400
419
services: services,
420
+ cancellationTimeoutTask: & cancellationTimeoutTask,
401
421
group: & group,
402
422
gracefulShutdownManagers: gracefulShutdownManagers
403
423
)
@@ -413,7 +433,7 @@ public actor ServiceGroup: Sendable {
413
433
]
414
434
)
415
435
416
- group . cancelAll ( )
436
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
417
437
}
418
438
419
439
case . gracefulShutdownCaught:
@@ -423,19 +443,29 @@ public actor ServiceGroup: Sendable {
423
443
do {
424
444
try await self . shutdownGracefully (
425
445
services: services,
446
+ cancellationTimeoutTask: & cancellationTimeoutTask,
426
447
group: & group,
427
448
gracefulShutdownManagers: gracefulShutdownManagers
428
449
)
429
450
} catch {
430
451
return . failure( error)
431
452
}
432
453
454
+ case . cancellationCaught:
455
+ // We caught cancellation in our child task so we have to spawn
456
+ // our cancellation timeout task if needed
457
+ self . logger. debug ( " Caught cancellation. " )
458
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group: & group)
459
+
433
460
case . signalSequenceFinished, . gracefulShutdownFinished:
434
461
// This can happen when we are either cancelling everything or
435
462
// when the user did not specify any shutdown signals. We just have to tolerate
436
463
// this.
437
464
continue
438
465
466
+ case . gracefulShutdownTimedOut:
467
+ fatalError ( " Received gracefulShutdownTimedOut but never triggered a graceful shutdown " )
468
+
439
469
case nil :
440
470
fatalError ( " Invalid result from group.next(). We checked if the group is empty before and still got nil " )
441
471
}
@@ -447,18 +477,28 @@ public actor ServiceGroup: Sendable {
447
477
self . logger. debug (
448
478
" Service lifecycle ended "
449
479
)
480
+ cancellationTimeoutTask? . cancel ( )
450
481
try result. get ( )
451
482
}
452
483
453
484
private func shutdownGracefully(
454
485
services: [ ServiceGroupConfiguration . ServiceConfiguration ? ] ,
486
+ cancellationTimeoutTask: inout Task < Void , Never > ? ,
455
487
group: inout ThrowingTaskGroup < ChildTaskResult , Error > ,
456
488
gracefulShutdownManagers: [ GracefulShutdownManager ]
457
489
) async throws {
458
490
guard case . running = self . state else {
459
491
fatalError ( " Unexpected state " )
460
492
}
461
493
494
+ if #available( macOS 13 . 0 , * ) , let maximumGracefulShutdownDuration = self . escalationConfiguration. maximumGracefulShutdownDuration {
495
+ group. addTask {
496
+ try await Task . sleep ( for: maximumGracefulShutdownDuration)
497
+ return . gracefulShutdownTimedOut
498
+ }
499
+ }
500
+
501
+
462
502
// We are storing the first error of a service that threw here.
463
503
var error : Error ?
464
504
@@ -509,7 +549,7 @@ public actor ServiceGroup: Sendable {
509
549
]
510
550
)
511
551
512
- group . cancelAll ( )
552
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
513
553
throw ServiceGroupError . serviceFinishedUnexpectedly ( )
514
554
}
515
555
@@ -561,9 +601,26 @@ public actor ServiceGroup: Sendable {
561
601
]
562
602
)
563
603
564
- group . cancelAll ( )
604
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group : & group )
565
605
}
566
606
607
+ case . gracefulShutdownTimedOut:
608
+ // Gracefully shutting down took longer than the user configured
609
+ // so we have to escalate it now.
610
+ self . logger. debug (
611
+ " Graceful shutdown took longer than allowed by the configuration. Cancelling the group now. " ,
612
+ metadata: [
613
+ self . loggingConfiguration. keys. serviceKey: " \( service. service) " ,
614
+ ]
615
+ )
616
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group: & group)
617
+
618
+ case . cancellationCaught:
619
+ // We caught cancellation in our child task so we have to spawn
620
+ // our cancellation timeout task if needed
621
+ self . logger. debug ( " Caught cancellation. " )
622
+ cancellationTimeoutTask = self . cancelGroupAndSpawnTimeoutIfNeeded ( group: & group)
623
+
567
624
case . signalSequenceFinished, . gracefulShutdownCaught, . gracefulShutdownFinished:
568
625
// We just have to tolerate this since signals and parent graceful shutdowns downs can race.
569
626
continue
@@ -575,7 +632,9 @@ public actor ServiceGroup: Sendable {
575
632
576
633
// If we hit this then all services are shutdown. The only thing remaining
577
634
// are the tasks that listen to the various graceful shutdown signals. We
578
- // just have to cancel those
635
+ // just have to cancel those.
636
+ // In this case we don't have to spawn our cancellation timeout task since
637
+ // we are sure all other child tasks are handling cancellation appropriately.
579
638
group. cancelAll ( )
580
639
581
640
// If we saw an error during graceful shutdown from a service that triggers graceful
@@ -584,6 +643,33 @@ public actor ServiceGroup: Sendable {
584
643
throw error
585
644
}
586
645
}
646
+
647
+ private func cancelGroupAndSpawnTimeoutIfNeeded(
648
+ group: inout ThrowingTaskGroup < ChildTaskResult , Error >
649
+ ) -> Task < Void , Never > ? {
650
+ group. cancelAll ( )
651
+ if #available( macOS 13 . 0 , iOS 16 . 0 , watchOS 9 . 0 , tvOS 16 . 0 , * ) , let maximumCancellationDuration = self . escalationConfiguration. maximumCancellationDuration {
652
+ // We have to spawn an unstructured task here because the call to our `run`
653
+ // method might have already been cancelled and we need to protect the sleep
654
+ // from being cancelled.
655
+ return Task {
656
+ do {
657
+ self . logger. debug (
658
+ " Task cancellation timeout task started. "
659
+ )
660
+ try await Task . sleep ( for: maximumCancellationDuration)
661
+ self . logger. debug (
662
+ " Cancellation took longer than allowed by the configuration. "
663
+ )
664
+ fatalError ( " Cancellation took longer than allowed by the configuration. " )
665
+ } catch {
666
+ // We got cancelled so our services must have finished up.
667
+ }
668
+ }
669
+ } else {
670
+ return nil
671
+ }
672
+ }
587
673
}
588
674
589
675
// This should be removed once we support Swift 5.9+
0 commit comments