Skip to content

allow completely single-threaded NIO programs #1499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions Sources/NIO/EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,31 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private let shutdownLock: Lock = Lock()
private var runState: RunState = .running

private static func runTheLoop(thread: NIOThread,
canEventLoopBeShutdownIndividually: Bool,
selectorFactory: @escaping () throws -> NIO.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer,
_ callback: @escaping (SelectableEventLoop) -> Void) {
assert(NIOThread.current == thread)
initializer(thread)

do {
let loop = SelectableEventLoop(thread: thread,
selector: try selectorFactory(),
canBeShutdownIndividually: canEventLoopBeShutdownIndividually)
threadSpecificEventLoop.currentValue = loop
defer {
threadSpecificEventLoop.currentValue = nil
}
callback(loop)
try loop.run()
} catch {
// We fatalError here because the only reasons this can be hit is if the underlying kqueue/epoll give us
// errors that we cannot handle which is an unrecoverable error for us.
fatalError("Unexpected error while running SelectableEventLoop: \(error).")
}
}

private static func setupThreadAndEventLoop(name: String,
selectorFactory: @escaping () throws -> NIO.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer) -> SelectableEventLoop {
Expand All @@ -760,22 +785,14 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {

loopUpAndRunningGroup.enter()
NIOThread.spawnAndRun(name: name, detachThread: false) { t in
initializer(t)

do {
/* we try! this as this must work (just setting up kqueue/epoll) or else there's not much we can do here */
let l = SelectableEventLoop(thread: t, selector: try! selectorFactory())
threadSpecificEventLoop.currentValue = l
defer {
threadSpecificEventLoop.currentValue = nil
}
MultiThreadedEventLoopGroup.runTheLoop(thread: t,
canEventLoopBeShutdownIndividually: false, // part of MTELG
selectorFactory: selectorFactory,
initializer: initializer) { l in
lock.withLock {
_loop = l
}
loopUpAndRunningGroup.leave()
try l.run()
} catch let err {
fatalError("unexpected error while executing EventLoop \(err)")
}
}
loopUpAndRunningGroup.wait()
Expand Down Expand Up @@ -907,7 +924,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {

g.notify(queue: q) {
for loop in self.eventLoops {
loop.syncFinaliseClose()
loop.syncFinaliseClose(joinThread: true)
}
var overallError: Error?
var queueCallbackPairs: [(DispatchQueue, (Error?) -> Void)]? = nil
Expand Down Expand Up @@ -937,6 +954,25 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
}
}
}

/// Convert the calling thread into an `EventLoop`.
///
/// This function will not return until the `EventLoop` has stopped. You can initiate stopping the `EventLoop` by
/// calling `eventLoop.shutdownGracefully` which will eventually make this function return.
///
/// - parameters:
/// - callback: Called _on_ the `EventLoop` that the calling thread was converted to, providing you the
/// `EventLoop` reference. Just like usually on the `EventLoop`, do not block in `callback`.
public static func withCurrentThreadAsEventLoop(_ callback: @escaping (EventLoop) -> Void) {
let callingThread = NIOThread.current
MultiThreadedEventLoopGroup.runTheLoop(thread: callingThread,
canEventLoopBeShutdownIndividually: true,
selectorFactory: NIO.Selector<NIORegistration>.init,
initializer: { _ in }) { loop in
loop.assertInEventLoop()
callback(loop)
}
}
}

extension MultiThreadedEventLoopGroup: CustomStringConvertible {
Expand Down
54 changes: 41 additions & 13 deletions Sources/NIO/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ internal final class SelectableEventLoop: EventLoop {
internal var _scheduledTasks = PriorityQueue<ScheduledTask>(ascending: true)
private var tasksCopy = ContiguousArray<() -> Void>()

private let canBeShutdownIndividually: Bool
@usableFromInline
internal let _tasksLock = Lock()
private let _externalStateLock = Lock()
Expand Down Expand Up @@ -131,7 +132,7 @@ internal final class SelectableEventLoop: EventLoop {
}
}

internal init(thread: NIOThread, selector: NIO.Selector<NIORegistration>) {
internal init(thread: NIOThread, selector: NIO.Selector<NIORegistration>, canBeShutdownIndividually: Bool) {
self._selector = selector
self.thread = thread
self._iovecs = UnsafeMutablePointer.allocate(capacity: Socket.writevLimitIOVectors)
Expand All @@ -144,6 +145,7 @@ internal final class SelectableEventLoop: EventLoop {
self.addresses = UnsafeMutableBufferPointer(start: _addresses, count: Socket.writevLimitIOVectors)
// We will process 4096 tasks per while loop.
self.tasksCopy.reserveCapacity(4096)
self.canBeShutdownIndividually = canBeShutdownIndividually
}

deinit {
Expand Down Expand Up @@ -369,11 +371,21 @@ internal final class SelectableEventLoop: EventLoop {
}
}
var nextReadyTask: ScheduledTask? = nil
self._tasksLock.withLock {
if let firstTask = self._scheduledTasks.peek() {
// The reason this is necessary is a very interesting race:
// In theory (and with `makeEventLoopFromCallingThread` even in practise), we could publish an
// `EventLoop` reference _before_ the EL thread has entered the `run` function.
// If that is the case, we need to schedule the first wakeup at the ready time for this task that was
// enqueued really early on, so let's do that :).
nextReadyTask = firstTask
}
}
while self.internalState != .noLongerRunning && self.internalState != .exitingThread {
// Block until there are events to handle or the selector was woken up
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
try withAutoReleasePool {
try _selector.whenReady(strategy: currentSelectorStrategy(nextReadyTask: nextReadyTask)) { ev in
try self._selector.whenReady(strategy: currentSelectorStrategy(nextReadyTask: nextReadyTask)) { ev in
switch ev.registration {
case .serverSocketChannel(let chan, _):
self.handleEvent(ev.io, channel: chan)
Expand All @@ -397,7 +409,7 @@ internal final class SelectableEventLoop: EventLoop {
// We need to ensure we process all tasks, even if a task added another task again
while true {
// TODO: Better locking
_tasksLock.withLockVoid {
self._tasksLock.withLockVoid {
if !self._scheduledTasks.isEmpty {
// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
let now: NIODeadline = .now()
Expand All @@ -406,7 +418,7 @@ internal final class SelectableEventLoop: EventLoop {
while tasksCopy.count < tasksCopy.capacity, let task = self._scheduledTasks.peek() {
if task.readyIn(now) <= .nanoseconds(0) {
self._scheduledTasks.pop()
tasksCopy.append(task.task)
self.tasksCopy.append(task.task)
} else {
nextReadyTask = task
break
Expand All @@ -419,19 +431,19 @@ internal final class SelectableEventLoop: EventLoop {
}

// all pending tasks are set to occur in the future, so we can stop looping.
if tasksCopy.isEmpty {
if self.tasksCopy.isEmpty {
break
}

// Execute all the tasks that were summited
for task in tasksCopy {
for task in self.tasksCopy {
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
withAutoReleasePool {
task()
}
}
// Drop everything (but keep the capacity) so we can fill it again on the next iteration.
tasksCopy.removeAll(keepingCapacity: true)
self.tasksCopy.removeAll(keepingCapacity: true)
}
}

Expand Down Expand Up @@ -490,7 +502,9 @@ internal final class SelectableEventLoop: EventLoop {
}
}

internal func syncFinaliseClose() {
internal func syncFinaliseClose(joinThread: Bool) {
// This may not be true in the future but today we need to join all ELs that can't be shut down individually.
assert(joinThread != self.canBeShutdownIndividually)
let goAhead = self.externalStateLock.withLock { () -> Bool in
switch self.externalState {
case .closed:
Expand All @@ -505,7 +519,9 @@ internal final class SelectableEventLoop: EventLoop {
guard goAhead else {
return
}
self.thread.join()
if joinThread {
self.thread.join()
}
self.externalStateLock.withLock {
precondition(self.externalState == .reclaimingResources)
self.externalState = .resourcesReclaimed
Expand All @@ -514,10 +530,22 @@ internal final class SelectableEventLoop: EventLoop {

@usableFromInline
func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
// This function is never called legally because the only possibly owner of an `SelectableEventLoop` is
// `MultiThreadedEventLoopGroup` which calls `closeGently`.
queue.async {
callback(EventLoopError.unsupportedOperation)
if self.canBeShutdownIndividually {
self.initiateClose(queue: queue) { result in
self.syncFinaliseClose(joinThread: false) // This thread was taken over by somebody else
switch result {
case .success:
callback(nil)
case .failure(let error):
callback(error)
}
}
} else {
// This function is never called legally because the only possibly owner of an `SelectableEventLoop` is
// `MultiThreadedEventLoopGroup` which calls `initiateClose` followed by `syncFinaliseClose`.
queue.async {
callback(EventLoopError.unsupportedOperation)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions Tests/NIOTests/EventLoopTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ extension EventLoopTest {
("testMultiThreadedEventLoopGroupDescription", testMultiThreadedEventLoopGroupDescription),
("testSafeToExecuteTrue", testSafeToExecuteTrue),
("testSafeToExecuteFalse", testSafeToExecuteFalse),
("testTakeOverThreadAndAlsoTakeItBack", testTakeOverThreadAndAlsoTakeItBack),
("testWeCanDoTrulySingleThreadedNetworking", testWeCanDoTrulySingleThreadedNetworking),
]
}
}
Expand Down
85 changes: 85 additions & 0 deletions Tests/NIOTests/EventLoopTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1085,4 +1085,89 @@ public final class EventLoopTest : XCTestCase {
XCTAssertFalse(loop.testsOnly_validExternalStateToScheduleTasks)
XCTAssertFalse(loop.testsOnly_validExternalStateToScheduleTasks)
}

func testTakeOverThreadAndAlsoTakeItBack() {
let currentNIOThread = NIOThread.current
let currentNSThread = Thread.current
let lock = Lock()
var hasBeenShutdown = false
let allDoneGroup = DispatchGroup()
allDoneGroup.enter()
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in
XCTAssertEqual(currentNIOThread, NIOThread.current)
XCTAssertEqual(currentNSThread, Thread.current)
XCTAssert(loop === MultiThreadedEventLoopGroup.currentEventLoop)
loop.shutdownGracefully(queue: DispatchQueue.global()) { error in
XCTAssertNil(error)
lock.withLock {
hasBeenShutdown = error == nil
}
allDoneGroup.leave()
}
}
allDoneGroup.wait()
XCTAssertTrue(lock.withLock { hasBeenShutdown })
}

func testWeCanDoTrulySingleThreadedNetworking() {
final class SaveReceivedByte: ChannelInboundHandler {
typealias InboundIn = ByteBuffer

// For once, we don't need thread-safety as we're taking the calling thread :)
var received: UInt8? = nil
var readCalls: Int = 0
var allDonePromise: EventLoopPromise<Void>? = nil

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.readCalls += 1
XCTAssertEqual(1, self.readCalls)

var data = self.unwrapInboundIn(data)
XCTAssertEqual(1, data.readableBytes)

XCTAssertNil(self.received)
self.received = data.readInteger()

self.allDonePromise?.succeed(())

context.close(promise: nil)
}
}

let receiveHandler = SaveReceivedByte() // There'll be just one connection, we can share.
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in
ServerBootstrap(group: loop)
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
.childChannelInitializer { accepted in
accepted.pipeline.addHandler(receiveHandler)
}
.bind(host: "127.0.0.1", port: 0)
.flatMap { serverChannel in
ClientBootstrap(group: loop).connect(to: serverChannel.localAddress!).flatMap { clientChannel in
var buffer = clientChannel.allocator.buffer(capacity: 1)
buffer.writeString("J")
return clientChannel.writeAndFlush(buffer)
}.flatMap {
XCTAssertNil(receiveHandler.allDonePromise)
receiveHandler.allDonePromise = loop.makePromise()
return receiveHandler.allDonePromise!.futureResult
}.flatMap {
serverChannel.close()
}
}.whenComplete { (result: Result<Void, Error>) -> Void in
func workaroundSR9815withAUselessFunction() {
XCTAssertNoThrow(try result.get())
}
workaroundSR9815withAUselessFunction()

// All done, let's return back into the calling thread.
loop.shutdownGracefully { error in
XCTAssertNil(error)
}
}
}

// All done, the EventLoop is terminated so we should be able to check the results.
XCTAssertEqual(UInt8(ascii: "J"), receiveHandler.received)
}
}