Skip to content

Commit fb49490

Browse files
committed
Add task execution package
Add task execution package that includes ConcurrentSequenceExecutor that supports executing sequences of Task in a highly concurrent fashion.
1 parent 5cc8829 commit fb49490

File tree

7 files changed

+539
-1
lines changed

7 files changed

+539
-1
lines changed

Concurrency.xcodeproj/project.pbxproj

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
/* End PBXAggregateTarget section */
2222

2323
/* Begin PBXBuildFile section */
24+
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */; };
25+
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94840210A4744007E59C8 /* SequenceExecutor.swift */; };
26+
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */; };
27+
41B94846210A4744007E59C8 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94842210A4744007E59C8 /* Task.swift */; };
28+
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */; };
2429
OBJ_27 /* AtomicBool.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_9 /* AtomicBool.swift */; };
2530
OBJ_28 /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_10 /* AtomicInt.swift */; };
2631
OBJ_29 /* AtomicReference.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AtomicReference.swift */; };
@@ -51,6 +56,11 @@
5156
/* End PBXContainerItemProxy section */
5257

5358
/* Begin PBXFileReference section */
59+
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SerialSequenceExecutor.swift; sourceTree = "<group>"; };
60+
41B94840210A4744007E59C8 /* SequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SequenceExecutor.swift; sourceTree = "<group>"; };
61+
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutor.swift; sourceTree = "<group>"; };
62+
41B94842210A4744007E59C8 /* Task.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Task.swift; sourceTree = "<group>"; };
63+
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutorTests.swift; sourceTree = "<group>"; };
5464
"Concurrency::Concurrency::Product" /* Concurrency.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = Concurrency.framework; sourceTree = BUILT_PRODUCTS_DIR; };
5565
"Concurrency::ConcurrencyTests::Product" /* ConcurrencyTests.xctest */ = {isa = PBXFileReference; lastKnownFileType = file; path = ConcurrencyTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
5666
OBJ_10 /* AtomicInt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AtomicInt.swift; sourceTree = "<group>"; };
@@ -83,6 +93,25 @@
8393
/* End PBXFrameworksBuildPhase section */
8494

8595
/* Begin PBXGroup section */
96+
41B9483E210A4744007E59C8 /* Executor */ = {
97+
isa = PBXGroup;
98+
children = (
99+
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */,
100+
41B94840210A4744007E59C8 /* SequenceExecutor.swift */,
101+
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */,
102+
41B94842210A4744007E59C8 /* Task.swift */,
103+
);
104+
path = Executor;
105+
sourceTree = "<group>";
106+
};
107+
41B94847210A4756007E59C8 /* Executor */ = {
108+
isa = PBXGroup;
109+
children = (
110+
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */,
111+
);
112+
path = Executor;
113+
sourceTree = "<group>";
114+
};
86115
OBJ_13 /* Tests */ = {
87116
isa = PBXGroup;
88117
children = (
@@ -94,6 +123,7 @@
94123
OBJ_14 /* ConcurrencyTests */ = {
95124
isa = PBXGroup;
96125
children = (
126+
41B94847210A4756007E59C8 /* Executor */,
97127
OBJ_15 /* AtomicBoolTests.swift */,
98128
OBJ_16 /* AtomicIntTests.swift */,
99129
OBJ_17 /* AtomicReferenceTests.swift */,
@@ -133,6 +163,7 @@
133163
OBJ_8 /* Concurrency */ = {
134164
isa = PBXGroup;
135165
children = (
166+
41B9483E210A4744007E59C8 /* Executor */,
136167
OBJ_9 /* AtomicBool.swift */,
137168
OBJ_10 /* AtomicInt.swift */,
138169
OBJ_11 /* AtomicReference.swift */,
@@ -225,9 +256,13 @@
225256
isa = PBXSourcesBuildPhase;
226257
buildActionMask = 0;
227258
files = (
259+
41B94846210A4744007E59C8 /* Task.swift in Sources */,
228260
OBJ_27 /* AtomicBool.swift in Sources */,
261+
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */,
262+
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */,
229263
OBJ_28 /* AtomicInt.swift in Sources */,
230264
OBJ_29 /* AtomicReference.swift in Sources */,
265+
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */,
231266
OBJ_30 /* CountDownLatch.swift in Sources */,
232267
);
233268
runOnlyForDeploymentPostprocessing = 0;
@@ -246,6 +281,7 @@
246281
files = (
247282
OBJ_48 /* AtomicBoolTests.swift in Sources */,
248283
OBJ_49 /* AtomicIntTests.swift in Sources */,
284+
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */,
249285
OBJ_50 /* AtomicReferenceTests.swift in Sources */,
250286
OBJ_51 /* CountDownLatchTests.swift in Sources */,
251287
);

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ Provides locking-free synchronization of a mutable object reference. It provides
3131
### `CountDownLatch`
3232
A utility class that allows coordination between threads. A count down latch starts with an initial count. Threads can then decrement the count until it reaches zero, at which point, the suspended waiting thread shall proceed. A `CountDownLatch` behaves differently from a `DispatchSemaphore` once the latch is open. Unlike a semaphore where subsequent waits would still block the caller thread, once a `CountDownLatch` is open, all subsequent waits can directly passthrough.
3333

34+
### `ConcurrentSequenceExecutor`
35+
An execution utility that executes sequences of tasks and returns the final result in a highly concurrent environment.
36+
37+
### `SerialSequenceExecutor`
38+
A debugging executor that executes sequences of tasks and returns the final result serially on the caller thread.
39+
3440
## Installation
3541

3642
### Carthage
@@ -88,4 +94,4 @@ Or you can follow the steps above to generate a Xcode project and run tests with
8894

8995

9096
## License
91-
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
97+
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
//
2+
// Copyright (c) 2018. Uber Technologies
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import Foundation
18+
19+
/// An executor that executes sequences of tasks concurrently.
20+
///
21+
/// - seeAlso: `SequenceExecutor`.
22+
/// - seeAlso: `Task`.
23+
public class ConcurrentSequenceExecutor: SequenceExecutor {
24+
25+
/// Initializer.
26+
///
27+
/// - parameter name: The name of the executor.
28+
/// - parameter qos: The quality of service of this executor. This
29+
/// defaults to `userInitiated`.
30+
public init(name: String, qos: DispatchQoS = .userInitiated) {
31+
taskQueue = DispatchQueue(label: "Executor.taskQueue-\(name)", qos: qos, attributes: .concurrent)
32+
}
33+
34+
/// Execute a sequence of tasks concurrently from the given initial task.
35+
///
36+
/// - parameter initialTask: The root task of the sequence of tasks
37+
/// to be executed.
38+
/// - parameter execution: The execution defining the sequence of tasks.
39+
/// When a task completes its execution, this closure is invoked with
40+
/// the task and its produced result. This closure is invoked from
41+
/// multiple threads concurrently, therefore it must be thread-safe.
42+
/// The tasks provided by this closure are executed concurrently.
43+
/// - returns: The execution handle that allows control and monitoring
44+
/// of the sequence of tasks being executed.
45+
public func executeSequence<SequenceResultType>(from initialTask: Task, with execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) -> SequenceExecutionHandle<SequenceResultType> {
46+
let handle: SynchronizedSequenceExecutionHandle<SequenceResultType> = SynchronizedSequenceExecutionHandle()
47+
execute(initialTask, with: handle, execution)
48+
return handle
49+
}
50+
51+
// MARK: - Private
52+
53+
private let taskQueue: DispatchQueue
54+
55+
private func execute<SequenceResultType>(_ task: Task, with sequenceHandle: SynchronizedSequenceExecutionHandle<SequenceResultType>, _ execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) {
56+
taskQueue.async {
57+
guard !sequenceHandle.isCancelled else {
58+
return
59+
}
60+
61+
let result = task.typeErasedExecute()
62+
let nextExecution = execution(task, result)
63+
switch nextExecution {
64+
case .continueSequence(let nextTask):
65+
self.execute(nextTask, with: sequenceHandle, execution)
66+
case .endOfSequence(let result):
67+
sequenceHandle.sequenceDidComplete(with: result)
68+
}
69+
}
70+
}
71+
}
72+
73+
private class SynchronizedSequenceExecutionHandle<SequenceResultType>: SequenceExecutionHandle<SequenceResultType> {
74+
75+
private let latch = CountDownLatch(count: 1)
76+
private let didCancel = AtomicBool(initialValue: false)
77+
78+
// Use a lock to ensure result is properly accessed, since the read
79+
// `await` method may be invoked on a different thread than the write
80+
// `sequenceDidComplete` method.
81+
private let resultLock = NSRecursiveLock()
82+
private var result: SequenceResultType?
83+
84+
fileprivate var isCancelled: Bool {
85+
return didCancel.value
86+
}
87+
88+
fileprivate override func await(withTimeout timeout: TimeInterval?) throws -> SequenceResultType {
89+
let didComplete = latch.await(timeout: timeout)
90+
if !didComplete {
91+
throw SequenceExecutionError.awaitTimeout
92+
}
93+
94+
resultLock.lock()
95+
defer {
96+
resultLock.unlock()
97+
}
98+
// If latch was counted down, the result must have been set. Therefore,
99+
// this forced unwrap is safe.
100+
return result!
101+
}
102+
103+
fileprivate func sequenceDidComplete(with result: SequenceResultType) {
104+
resultLock.lock()
105+
self.result = result
106+
resultLock.unlock()
107+
108+
latch.countDown()
109+
}
110+
111+
fileprivate override func cancel() {
112+
didCancel.compareAndSet(expect: false, newValue: true)
113+
}
114+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
//
2+
// Copyright (c) 2018. Uber Technologies
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import Foundation
18+
19+
/// Errors that can occur during a sequence execution.
20+
public enum SequenceExecutionError: Error {
21+
/// The waiting on sequence completion timed out.
22+
case awaitTimeout
23+
}
24+
25+
/// The handle of the execution of a sequence of tasks, that allows control
26+
/// and monitoring of the said sequence of tasks.
27+
// This cannot be a protocol, since `SequenceExecutor` references this as a
28+
// type. Protocols with associatedType cannot be directly used as types.
29+
public class SequenceExecutionHandle<SequenceResultType> {
30+
31+
/// Block the caller thread until the sequence of tasks all finished
32+
/// execution or the specified timeout period has elapsed.
33+
///
34+
/// - parameter timeout: The duration to wait before the timeout error
35+
/// is thrown. `nil` to wait indefinitely until the sequence execution
36+
/// completes.
37+
/// - throws: `SequenceExecutionError.awaitTimeout` if the given timeout
38+
/// period elapsed before the sequence execution completed.
39+
func await(withTimeout timeout: TimeInterval?) throws -> SequenceResultType {
40+
fatalError("await not yet implemented.")
41+
}
42+
43+
/// Cancel the sequence execution at the point this function is invoked.
44+
func cancel() {}
45+
}
46+
47+
/// The execution of a sequence.
48+
public enum SequenceExecution<ResultType> {
49+
/// The execution of the sequence should continue with the associated
50+
/// value task.
51+
case continueSequence(Task)
52+
/// The end of the entire task sequence with associated value result.
53+
case endOfSequence(ResultType)
54+
}
55+
56+
/// Executor of sequences of tasks.
57+
///
58+
/// - seeAlso: `Task`.
59+
public protocol SequenceExecutor {
60+
61+
/// Execute a sequence of tasks from the given initial task.
62+
///
63+
/// - parameter initialTask: The root task of the sequence of tasks
64+
/// to be executed.
65+
/// - parameter execution: The execution defining the sequence of tasks.
66+
/// When a task completes its execution, this closure is invoked with
67+
/// the task and its produced result.
68+
/// - returns: The execution handle that allows control and monitoring
69+
/// of the sequence of tasks being executed.
70+
func executeSequence<SequenceResultType>(from initialTask: Task, with execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) -> SequenceExecutionHandle<SequenceResultType>
71+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
//
2+
// Copyright (c) 2018. Uber Technologies
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
import Foundation
18+
19+
/// An executor that executes sequences of tasks serially from the
20+
/// caller thread.
21+
///
22+
/// - note: Generally this implementation should only be used for debugging
23+
/// purposes, as debugging highly concurrent task executions can be very
24+
/// challenging. Production code should use `ConcurrentSequenceExecutor`.
25+
/// - seeAlso: `SequenceExecutor`.
26+
/// - seeAlso: `Task`.
27+
class SerialSequenceExecutor: SequenceExecutor {
28+
29+
/// Execute a sequence of tasks serially from the given initial task
30+
/// on the caller thread.
31+
///
32+
/// - parameter initialTask: The root task of the sequence of tasks
33+
/// to be executed.
34+
/// - parameter execution: The execution defining the sequence of tasks.
35+
/// When a task completes its execution, this closure is invoked with
36+
/// the task and its produced result. This closure is invoked from
37+
/// the caller thread serially as each task completes. The tasks provided
38+
/// by this closure are executed serially on the initial caller thread.
39+
/// - returns: The execution handle that allows control and monitoring
40+
/// of the sequence of tasks being executed.
41+
func executeSequence<SequenceResultType>(from initialTask: Task, with execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) -> SequenceExecutionHandle<SequenceResultType> {
42+
let handle: SequenceExecutionHandleImpl<SequenceResultType> = SequenceExecutionHandleImpl()
43+
execute(initialTask, with: handle, execution)
44+
return handle
45+
}
46+
47+
// MARK: - Private
48+
49+
private func execute<SequenceResultType>(_ task: Task, with sequenceHandle: SequenceExecutionHandleImpl<SequenceResultType>, _ execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) {
50+
guard !sequenceHandle.isCancelled else {
51+
return
52+
}
53+
54+
let result = task.typeErasedExecute()
55+
let nextExecution = execution(task, result)
56+
switch nextExecution {
57+
case .continueSequence(let nextTask):
58+
self.execute(nextTask, with: sequenceHandle, execution)
59+
case .endOfSequence(let result):
60+
sequenceHandle.sequenceDidComplete(with: result)
61+
}
62+
}
63+
}
64+
65+
private class SequenceExecutionHandleImpl<SequenceResultType>: SequenceExecutionHandle<SequenceResultType> {
66+
67+
private var didCancel = false
68+
private var result: SequenceResultType?
69+
70+
fileprivate var isCancelled: Bool {
71+
return didCancel
72+
}
73+
74+
fileprivate override func await(withTimeout timeout: TimeInterval?) throws -> SequenceResultType {
75+
return result!
76+
}
77+
78+
fileprivate func sequenceDidComplete(with result: SequenceResultType) {
79+
self.result = result
80+
}
81+
82+
fileprivate override func cancel() {
83+
didCancel = true
84+
}
85+
}

0 commit comments

Comments
 (0)