Skip to content

Commit 389d245

Browse files
fix: Further changes to fix queueing
Signed-off-by: Fabrizio Demaria <fabrizio.f.demaria@gmail.com>
1 parent a9cb0b3 commit 389d245

File tree

4 files changed

+86
-94
lines changed

4 files changed

+86
-94
lines changed

Sources/OpenFeature/AsyncCoalescingSerialQueue.swift

Lines changed: 0 additions & 52 deletions
This file was deleted.

Sources/OpenFeature/AsyncSerialQueue.swift

Lines changed: 0 additions & 35 deletions
This file was deleted.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import Foundation
2+
3+
/// Unified serial async task queue with operation-type-aware coalescing.
4+
/// - Non-coalescable operations always execute in order
5+
/// - Consecutive coalescable operations are coalesced (only the last one executes)
6+
/// - Order is always preserved
7+
internal actor AsyncUnifiedQueue {
8+
private var currentTask: Task<Void, Never>?
9+
10+
private struct QueuedOperation {
11+
let operation: () async -> Void
12+
let continuation: CheckedContinuation<Void, Never>
13+
let canCoalesce: Bool
14+
}
15+
16+
private var queue: [QueuedOperation] = []
17+
18+
/// Runs the given operation serially.
19+
/// - If canCoalesce is false: operation always executes
20+
/// - If canCoalesce is true: may be skipped if superseded by a later coalescable operation
21+
func run(canCoalesce: Bool, operation: @Sendable @escaping () async -> Void) async {
22+
await withCheckedContinuation { continuation in
23+
queue.append(QueuedOperation(operation: operation, continuation: continuation, canCoalesce: canCoalesce))
24+
25+
if currentTask == nil {
26+
processNext()
27+
}
28+
}
29+
}
30+
31+
private func processNext() {
32+
guard !queue.isEmpty else {
33+
currentTask = nil
34+
return
35+
}
36+
37+
// Find the next batch to execute
38+
// A batch is either:
39+
// 1. A single non-coalescable operation, OR
40+
// 2. Consecutive coalescable operations (we execute only the last one)
41+
42+
let firstOp = queue[0]
43+
44+
if !firstOp.canCoalesce {
45+
// Non-coalescable operation: execute it immediately
46+
let op = queue.removeFirst()
47+
currentTask = Task { [weak self] in
48+
await op.operation()
49+
op.continuation.resume()
50+
await self?.processNext()
51+
}
52+
} else {
53+
// Coalescable operation: find all consecutive coalescable ops
54+
var coalescableCount = 0
55+
for op in queue {
56+
if op.canCoalesce {
57+
coalescableCount += 1
58+
} else {
59+
break
60+
}
61+
}
62+
63+
// Execute only the last one in the coalescable batch
64+
let toSkip = Array(queue.prefix(coalescableCount - 1))
65+
let toExecute = queue[coalescableCount - 1]
66+
queue.removeFirst(coalescableCount)
67+
68+
currentTask = Task { [weak self] in
69+
await toExecute.operation()
70+
71+
// Resume all continuations (both skipped and executed)
72+
for op in toSkip {
73+
op.continuation.resume()
74+
}
75+
toExecute.continuation.resume()
76+
77+
await self?.processNext()
78+
}
79+
}
80+
}
81+
}

Sources/OpenFeature/OpenFeatureAPI.swift

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ import Foundation
66
public class OpenFeatureAPI {
77
private let eventHandler = EventHandler()
88
private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue") // Sync queue to change state atomically
9-
private let providerQueue: AsyncSerialQueue // Non-coalescing queue for provider operations
10-
private let contextQueue: AsyncCoalescingSerialQueue // Coalescing queue for context updates
9+
private let unifiedQueue: AsyncUnifiedQueue // Unified queue for all operations with smart coalescing
1110

1211
private(set) var providerSubject = CurrentValueSubject<FeatureProvider?, Never>(nil)
1312
private(set) var evaluationContext: EvaluationContext?
@@ -18,8 +17,7 @@ public class OpenFeatureAPI {
1817
static public let shared = OpenFeatureAPI()
1918

2019
public init() {
21-
providerQueue = AsyncSerialQueue()
22-
contextQueue = AsyncCoalescingSerialQueue()
20+
unifiedQueue = AsyncUnifiedQueue()
2321
}
2422

2523
/**
@@ -64,7 +62,7 @@ public class OpenFeatureAPI {
6462

6563
public func clearProvider() {
6664
Task {
67-
await providerQueue.run { [self] in
65+
await unifiedQueue.run(canCoalesce: false) { [self] in
6866
stateQueue.sync {
6967
self.providerSubject.send(nil)
7068
self.providerStatus = .notReady
@@ -158,7 +156,7 @@ public class OpenFeatureAPI {
158156
}
159157

160158
private func setProviderInternal(provider: FeatureProvider, initialContext: EvaluationContext? = nil) async {
161-
await providerQueue.run { [self] in
159+
await unifiedQueue.run(canCoalesce: false) { [self] in
162160
// Set initial state atomically
163161
stateQueue.sync {
164162
self.providerStatus = .notReady
@@ -195,7 +193,7 @@ public class OpenFeatureAPI {
195193
}
196194

197195
private func updateContext(evaluationContext: EvaluationContext) async {
198-
await contextQueue.run { [self] in
196+
await unifiedQueue.run(canCoalesce: true) { [self] in
199197
// Get old context and set new context atomically
200198
let (oldContext, provider) = stateQueue.sync { () -> (EvaluationContext?, FeatureProvider?) in
201199
let oldContext = self.evaluationContext

0 commit comments

Comments
 (0)