Skip to content

Better handle writability changes in triggerWriteOperations #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/NIO/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true

Expand Down Expand Up @@ -426,7 +427,8 @@ final class PendingDatagramWritesManager: PendingWritesManager {
metadata: envelope.metadata))

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
// Returns false to signal the Channel became non-writable and we need to notify the user
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
}
return true
Expand Down
18 changes: 13 additions & 5 deletions Sources/NIO/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ final class PendingStreamWritesManager: PendingWritesManager {

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal var publishedWritability = true

internal var writeSpinCount: UInt = 16

Expand Down Expand Up @@ -314,7 +315,8 @@ final class PendingStreamWritesManager: PendingWritesManager {
self.state.append(.init(data: data, promise: promise))

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
// Returns false to signal the Channel became non-writable and we need to notify the user
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
}
return true
Expand Down Expand Up @@ -451,12 +453,17 @@ internal enum WriteMechanism {
case nothingToBeWritten
}

internal protocol PendingWritesManager {
internal protocol PendingWritesManager: class {
var isOpen: Bool { get }
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
var channelWritabilityFlag: NIOAtomic<Bool> { get }

/// Represents the writability state the last time we published a writability change to the `Channel`.
/// This is used in `triggerWriteOperations` to determine whether we need to trigger a writability
/// change.
var publishedWritability: Bool { get set }
}

extension PendingWritesManager {
Expand All @@ -466,7 +473,6 @@ extension PendingWritesManager {
}

internal func triggerWriteOperations(triggerOneWriteOperation: (WriteMechanism) throws -> OneWriteOperationResult) throws -> OverallWriteResult {
let wasWritable = self.isWritable
var result = OverallWriteResult(writeResult: .couldNotWriteEverything, writabilityChange: false)

writeSpinLoop: for _ in 0...self.writeSpinCount {
Expand Down Expand Up @@ -494,9 +500,11 @@ extension PendingWritesManager {
// Also, it is very important to not do any outcalls to user code outside of the loop until the `flushNow`
// re-entrancy protection is off again.

if !wasWritable {
// Was not writable before so signal back to the caller the possible state change
if !self.publishedWritability {
// When we last published a writability change the `Channel` wasn't writable, signal back to the caller
// whether we should emit a writability change.
result.writabilityChange = self.isWritable
self.publishedWritability = result.writabilityChange
}
return result
}
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/ChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ extension ChannelTests {
("testDescriptionCanBeCalledFromNonEventLoopThreads", testDescriptionCanBeCalledFromNonEventLoopThreads),
("testFixedSizeRecvByteBufferAllocatorSizeIsConstant", testFixedSizeRecvByteBufferAllocatorSizeIsConstant),
("testCloseInConnectPromise", testCloseInConnectPromise),
("testWritabilityChangeDuringReentrantFlushNow", testWritabilityChangeDuringReentrantFlushNow),
]
}
}
Expand Down
91 changes: 91 additions & 0 deletions Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,40 @@ public final class ChannelTests: XCTestCase {
}.wait())
}
}

func testWritabilityChangeDuringReentrantFlushNow() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let loop = group.next()
let handler = ReentrantWritabilityChangingHandler(becameUnwritable: loop.makePromise(),
becameWritable: loop.makePromise())

let serverFuture = ServerBootstrap(group: group)
.childChannelOption(ChannelOptions.writeBufferWaterMark, value: handler.watermark)
.childChannelInitializer { channel in
return channel.pipeline.addHandler(handler)
}
.bind(host: "localhost", port: 0)

let server: Channel = try assertNoThrowWithValue(try serverFuture.wait())
defer {
XCTAssertNoThrow(try server.close().wait())
}

let clientFuture = ClientBootstrap(group: group)
.connect(host: "localhost", port: server.localAddress!.port!)

let client: Channel = try assertNoThrowWithValue(try clientFuture.wait())
defer {
XCTAssertNoThrow(try client.close().wait())
}

XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait())
XCTAssertNoThrow(try handler.becameWritable.futureResult.wait())
}
}

fileprivate final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler {
Expand Down Expand Up @@ -2835,3 +2869,60 @@ fileprivate class VerifyConnectionFailureHandler: ChannelInboundHandler {
context.fireChannelUnregistered()
}
}

final class ReentrantWritabilityChangingHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

let watermark = ChannelOptions.Types.WriteBufferWaterMark(low: 100, high: 200)

let becameWritable: EventLoopPromise<Void>
let becameUnwritable: EventLoopPromise<Void>

var isWritableCount = 0
var isNotWritableCount = 0

init(becameUnwritable: EventLoopPromise<Void>, becameWritable: EventLoopPromise<Void>) {
self.becameUnwritable = becameUnwritable
self.becameWritable = becameWritable
}

func channelActive(context: ChannelHandlerContext) {
// We want to enqueue at least two pending writes before flushing. Neither of which
// should cause writability to change. However, we'll hang a callback off the first
// write which will make the channel unwritable and a writability change to be
// emitted. The flush for that write should result in the writability flipping back
// again.
let b1 = context.channel.allocator.buffer(repeating: 0, count: 50)
context.write(self.wrapOutboundOut(b1)).whenSuccess { _ in
// We should still be writable.
XCTAssertTrue(context.channel.isWritable)
XCTAssertEqual(self.isNotWritableCount, 0)
XCTAssertEqual(self.isWritableCount, 0)

// Write again. But now breach high water mark. This should cause us to become
// unwritable.
let b2 = context.channel.allocator.buffer(repeating: 0, count: 250)
context.write(self.wrapOutboundOut(b2), promise: nil)
XCTAssertFalse(context.channel.isWritable)
XCTAssertEqual(self.isNotWritableCount, 1)
XCTAssertEqual(self.isWritableCount, 0)

// Now flush. This should lead to us becoming writable again.
context.flush()
}

// Queue another write and flush.
context.writeAndFlush(self.wrapOutboundOut(b1), promise: nil)
}

func channelWritabilityChanged(context: ChannelHandlerContext) {
if context.channel.isWritable {
self.isWritableCount += 1
self.becameWritable.succeed(())
} else {
self.isNotWritableCount += 1
self.becameUnwritable.succeed(())
}
}
}
1 change: 1 addition & 0 deletions Tests/NIOTests/DatagramChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ extension DatagramChannelTests {
("testEcnSendReceiveIPV6VectorRead", testEcnSendReceiveIPV6VectorRead),
("testEcnSendReceiveIPV4VectorReadVectorWrite", testEcnSendReceiveIPV4VectorReadVectorWrite),
("testEcnSendReceiveIPV6VectorReadVectorWrite", testEcnSendReceiveIPV6VectorReadVectorWrite),
("testWritabilityChangeDuringReentrantFlushNow", testWritabilityChangeDuringReentrantFlushNow),
]
}
}
Expand Down
38 changes: 38 additions & 0 deletions Tests/NIOTests/DatagramChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,42 @@ final class DatagramChannelTests: XCTestCase {
}
testEcnReceive(address: "::1", vectorRead: true, vectorSend: true)
}

func testWritabilityChangeDuringReentrantFlushNow() throws {
class EnvelopingHandler: ChannelOutboundHandler {
typealias OutboundIn = ByteBuffer
typealias OutboundOut = AddressedEnvelope<ByteBuffer>

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let buffer = self.unwrapOutboundIn(data)
context.write(self.wrapOutboundOut(AddressedEnvelope(remoteAddress: context.channel.localAddress!, data: buffer)), promise: promise)
}
}

let loop = self.group.next()
let handler = ReentrantWritabilityChangingHandler(becameUnwritable: loop.makePromise(),
becameWritable: loop.makePromise())

let channel1Future = DatagramBootstrap(group: self.group)
.bind(host: "localhost", port: 0)
let channel1 = try assertNoThrowWithValue(try channel1Future.wait())
defer {
XCTAssertNoThrow(try channel1.close().wait())
}

let channel2Future = DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.writeBufferWaterMark, value: handler.watermark)
.channelInitializer { channel in
channel.pipeline.addHandlers([EnvelopingHandler(), handler])
}
.bind(host: "localhost", port: 0)
let channel2 = try assertNoThrowWithValue(try channel2Future.wait())
defer {
XCTAssertNoThrow(try channel2.close().wait())
}

// Now wait.
XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait())
XCTAssertNoThrow(try handler.becameWritable.futureResult.wait())
}
}