Skip to content

Commit 786370b

Browse files
committed
Remove unbounded strategy, rename copy -> makeAdditionalSource, rename asyncSequence() -> elements(), and move enqueue and cancel to the CallbackHandle previously called CallbackToken
1 parent f4c3a36 commit 786370b

File tree

4 files changed

+214
-279
lines changed

4 files changed

+214
-279
lines changed

Evolution/0016-mutli-producer-single-consumer-channel.md

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,10 @@ do {
174174
case .produceMore:
175175
// Trigger more production in the underlying system
176176

177-
case .enqueueCallback(let callbackToken):
177+
case .enqueueCallback(let callbackHandle):
178178
// There are enough values in the channel already. We need to enqueue
179179
// a callback to get notified when we should produce more.
180-
source.enqueueCallback(token: callbackToken, onProduceMore: { result in
180+
callbackHandle.enqueueCallback(onProduceMore: { result in
181181
switch result {
182182
case .success:
183183
// Trigger more production in the underlying system
@@ -195,8 +195,8 @@ The above API offers the most control and highest performance when bridging a
195195
synchronous producer to a `MultiProducerSingleConsumerAsyncChannel`. First, you have
196196
to send values using the `send(contentsOf:)` which returns a `SendResult`. The
197197
result either indicates that more values should be produced or that a callback
198-
should be enqueued by calling the `enqueueCallback(callbackToken:
199-
onProduceMore:)` method. This callback is invoked once the backpressure strategy
198+
should be enqueued by calling the `enqueueCallback(onProduceMore:)` method.
199+
This callback is invoked once the backpressure strategy
200200
decided that more values should be produced. This API aims to offer the most
201201
flexibility with the greatest performance. The callback only has to be allocated
202202
in the case where the producer needs to pause production.
@@ -238,7 +238,7 @@ let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
238238
)
239239
var channel = consume channelAndSource.channel
240240
var source1 = consume channelAndSource.source
241-
var source2 = source1.copy()
241+
var source2 = source1.makeAdditionalSource()
242242

243243
group.addTask {
244244
try await source1.send(1)
@@ -373,7 +373,7 @@ let channelAndSource = MultiProducerSingleConsumerAsyncChannel.makeChannel(
373373
)
374374
var channel = consume channelAndSource.channel
375375
var source1 = consume channelAndSource.source
376-
var source2 = source1.copy()
376+
var source2 = source1.makeAdditionalSource()
377377
source1.setOnTerminationCallback { print("Terminated") }
378378

379379
_ = try await source1.send(1)
@@ -459,10 +459,10 @@ public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error
459459
/// case .produceMore:
460460
/// // Trigger more production in the underlying system
461461
///
462-
/// case .enqueueCallback(let callbackToken):
462+
/// case .enqueueCallback(let callbackHandle):
463463
/// // There are enough values in the channel already. We need to enqueue
464464
/// // a callback to get notified when we should produce more.
465-
/// source.enqueueCallback(token: callbackToken, onProduceMore: { result in
465+
/// callbackHandle.enqueueCallback(onProduceMore: { result in
466466
/// switch result {
467467
/// case .success:
468468
/// // Trigger more production in the underlying system
@@ -478,7 +478,7 @@ public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error
478478
///
479479
/// ### Multiple producers
480480
///
481-
/// To support multiple producers the source offers a ``Source/copy()`` method to produce a new source.
481+
/// To support multiple producers the source offers a ``Source/makeAdditionalSource()`` method to produce a new source.
482482
///
483483
/// ### Terminating the production of values
484484
///
@@ -582,20 +582,37 @@ extension MultiProducerSingleConsumerAsyncChannel {
582582

583583
/// A type that indicates the result of sending elements to the source.
584584
public enum SendResult: ~Copyable, Sendable {
585-
/// An opaque token that is returned when the channel's backpressure strategy indicated that production should
586-
/// be suspended. Use this token to enqueue a callback by calling the ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)`` method.
585+
/// A handle that is returned when the channel's backpressure strategy indicated that production should
586+
/// be suspended. Use this handle to enqueue a callback by calling the ``CallbackHandle/enqueueCallback(onProduceMore:)`` method.
587587
///
588-
/// - Important: This token must only be passed once to ``MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)``
589-
/// and ``MultiProducerSingleConsumerAsyncChannel/Source/cancelCallback(callbackToken:)``.
590-
public struct CallbackToken: Sendable, Hashable { }
588+
/// - Important: ``CallbackHandle/enqueueCallback(onProduceMore:)`` and ``CallbackHandle/cancelCallback()`` must
589+
/// only be called once.
590+
public struct CallbackHandle: Sendable, Hashable {
591+
/// Enqueues a callback that will be invoked once more elements should be produced.
592+
///
593+
/// - Important: Calling enqueue more than once is **not allowed**.
594+
///
595+
/// - Parameters:
596+
/// - onProduceMore: The callback which gets invoked once more elements should be produced.
597+
@inlinable
598+
public mutating func enqueueCallback(
599+
onProduceMore: sending @escaping (Result<Void, Error>) -> Void
600+
)
601+
602+
/// Cancel an enqueued callback.
603+
///
604+
/// - Note: This methods supports being called before ``enqueueCallback(onProduceMore:)`` is called.
605+
///
606+
/// - Important: Calling enqueue more than once is **not allowed**.
607+
@inlinable
608+
public mutating func cancelCallback()
609+
}
591610

592611
/// Indicates that more elements should be produced and send to the source.
593612
case produceMore
594613

595614
/// Indicates that a callback should be enqueued.
596-
///
597-
/// The associated token should be passed to the ````MultiProducerSingleConsumerAsyncChannel/Source/enqueueCallback(callbackToken:onProduceMore:)```` method.
598-
case enqueueCallback(CallbackToken)
615+
case enqueueCallback(CallbackHandle)
599616
}
600617

601618
/// A callback to invoke when the channel finished.
@@ -608,7 +625,7 @@ extension MultiProducerSingleConsumerAsyncChannel {
608625
/// The channel will only automatically be finished if all existing sources have been deinited.
609626
///
610627
/// - Returns: A new source for sending elements to the channel.
611-
public mutating func copy() -> Source
628+
public mutating func makeAdditionalSource() -> Source
612629

613630
/// Sends new elements to the channel.
614631
///
@@ -632,32 +649,6 @@ extension MultiProducerSingleConsumerAsyncChannel {
632649
/// - Returns: The result that indicates if more elements should be produced at this time.
633650
public mutating func send(_ element: sending consuming Element) throws -> SendResult
634651

635-
/// Enqueues a callback that will be invoked once more elements should be produced.
636-
///
637-
/// Call this method after ``send(contentsOf:)-5honm`` or ``send(_:)-3jxzb`` returned ``SendResult/enqueueCallback(_:)``.
638-
///
639-
/// - Important: Enqueueing the same token multiple times is **not allowed**.
640-
///
641-
/// - Parameters:
642-
/// - callbackToken: The callback token.
643-
/// - onProduceMore: The callback which gets invoked once more elements should be produced.
644-
public mutating func enqueueCallback(
645-
callbackToken: consuming SendResult.CallbackToken,
646-
onProduceMore: sending @escaping (Result<Void, Error>
647-
) -> Void)
648-
649-
/// Cancel an enqueued callback.
650-
///
651-
/// Call this method to cancel a callback enqueued by the ``enqueueCallback(callbackToken:onProduceMore:)`` method.
652-
///
653-
/// - Note: This methods supports being called before ``enqueueCallback(callbackToken:onProduceMore:)`` is called and
654-
/// will mark the passed `callbackToken` as cancelled.
655-
///
656-
/// - Parameter callbackToken: The callback token.
657-
public mutating func cancelCallback(
658-
callbackToken: consuming SendResult.CallbackToken
659-
)
660-
661652
/// Send new elements to the channel and provide a callback which will be invoked once more elements should be produced.
662653
///
663654
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the

Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -78,38 +78,14 @@ extension MultiProducerSingleConsumerAsyncChannel {
7878
}
7979
}
8080

81-
@usableFromInline
82-
struct _Unbounded: Sendable, CustomStringConvertible {
83-
@usableFromInline
84-
var description: String {
85-
"unbounded"
86-
}
87-
88-
init() {}
89-
90-
@inlinable
91-
mutating func didSend(elements: Deque<SendableConsumeOnceBox<Element>>.SubSequence) -> Bool {
92-
true
93-
}
94-
95-
@inlinable
96-
mutating func didConsume(element: SendableConsumeOnceBox<Element>) -> Bool {
97-
true
98-
}
99-
}
100-
10181
/// A watermark based strategy.
10282
case watermark(_Watermark)
103-
/// An unbounded based strategy.
104-
case unbounded(_Unbounded)
10583

10684
@usableFromInline
10785
var description: String {
10886
switch consume self {
10987
case .watermark(let strategy):
11088
return strategy.description
111-
case .unbounded(let unbounded):
112-
return unbounded.description
11389
}
11490
}
11591

@@ -120,10 +96,6 @@ extension MultiProducerSingleConsumerAsyncChannel {
12096
let result = strategy.didSend(elements: elements)
12197
self = .watermark(strategy)
12298
return result
123-
case .unbounded(var strategy):
124-
let result = strategy.didSend(elements: elements)
125-
self = .unbounded(strategy)
126-
return result
12799
}
128100
}
129101

@@ -134,10 +106,6 @@ extension MultiProducerSingleConsumerAsyncChannel {
134106
let result = strategy.didConsume(element: element)
135107
self = .watermark(strategy)
136108
return result
137-
case .unbounded(var strategy):
138-
let result = strategy.didConsume(element: element)
139-
self = .unbounded(strategy)
140-
return result
141109
}
142110
}
143111
}
@@ -304,15 +272,15 @@ extension MultiProducerSingleConsumerAsyncChannel {
304272
return .produceMore
305273

306274
case .returnEnqueue(let callbackToken):
307-
return .enqueueCallback(.init(id: callbackToken))
275+
return .enqueueCallback(.init(id: callbackToken, storage: self))
308276

309277
case .resumeConsumerAndReturnProduceMore(let continuation, let element):
310278
continuation.resume(returning: element.take())
311279
return .produceMore
312280

313281
case .resumeConsumerAndReturnEnqueue(let continuation, let element, let callbackToken):
314282
continuation.resume(returning: element.take())
315-
return .enqueueCallback(.init(id: callbackToken))
283+
return .enqueueCallback(.init(id: callbackToken, storage: self))
316284

317285
case .throwFinishedError:
318286
throw MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError()
@@ -995,33 +963,6 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage {
995963
)
996964
/// Indicates that the producer has been finished.
997965
case throwFinishedError
998-
999-
// @inlinable
1000-
// init(
1001-
// callbackToken: UInt64?,
1002-
// continuation: (UnsafeContinuation<Element?, Error>, SendableConsumeOnceBox<Element>)? = nil
1003-
// ) {
1004-
// switch (callbackToken, continuationAndElement) {
1005-
// case (.none, .none):
1006-
// self = .returnProduceMore
1007-
//
1008-
// case (.some(let callbackToken), .none):
1009-
// self = .returnEnqueue(callbackToken: callbackToken)
1010-
//
1011-
// case (.none, .some((let continuation, let element))):
1012-
// self = .resumeConsumerAndReturnProduceMore(
1013-
// continuation: continuation,
1014-
// element: SendableConsumeOnceBox(wrapped: element)
1015-
// )
1016-
//
1017-
// case (.some(let callbackToken), .some((let continuation, let element))):
1018-
// self = .resumeConsumerAndReturnEnqueue(
1019-
// continuation: continuation,
1020-
// element: SendableConsumeOnceBox(wrapped: element),
1021-
// callbackToken: callbackToken
1022-
// )
1023-
// }
1024-
// }
1025966
}
1026967

1027968
@inlinable

0 commit comments

Comments
 (0)