Skip to content

Commit db13a14

Browse files
committed
Rewrite client connection management
Motivation: The client connection management code is pretty bad. It's hard to follow and test reliably. There were some thread safety issues and it's also possible to give a call a multiplexer from a future channel which will fail but may succeed after retrying the connection. It also lacked the abilitiy to close an idle channel (i.e. one with no open HTTP streams). Modifications: - Rewrite the client connection management logic: this is handled by `ConnectionManager` and `ClientConnectivityHandler`. - Channels will idle after 5 minutes of having no open streams (configurability will come in a later PR) - Connectivity state changes are async'd onto a dispatch queue (configurable queue will come later) Result: - Connection management code is much easier to follow and can be tested with an EmbeddedChannel - Fewer bugs - We no longer hold a lock when calling out to the connectivity delegate
1 parent db6ae11 commit db13a14

16 files changed

+1533
-542
lines changed

Sources/GRPC/ClientConnection.swift

Lines changed: 33 additions & 282 deletions
Large diffs are not rendered by default.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2020, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import NIO
17+
import NIOHTTP2
18+
19+
internal class ClientConnectivityHandler: ChannelInboundHandler {
20+
typealias InboundIn = HTTP2Frame
21+
22+
private var connectionManager: ConnectionManager
23+
private let idleTimeout: TimeAmount
24+
25+
private var activeStreams = 0
26+
private var scheduledIdle: Scheduled<Void>? = nil
27+
private var isReady = false
28+
29+
init(connectionManager: ConnectionManager, idleTimeout: TimeAmount = .minutes(5)) {
30+
self.connectionManager = connectionManager
31+
self.idleTimeout = idleTimeout
32+
}
33+
34+
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
35+
if event is NIOHTTP2StreamCreatedEvent {
36+
// We have a stream: don't go idle
37+
self.scheduledIdle?.cancel()
38+
self.activeStreams += 1
39+
} else if event is StreamClosedEvent {
40+
self.activeStreams -= 1
41+
// No active streams: go idle soon.
42+
if self.activeStreams == 0 {
43+
self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
44+
self.idle(context: context)
45+
}
46+
}
47+
}
48+
49+
context.fireUserInboundEventTriggered(event)
50+
}
51+
52+
func channelActive(context: ChannelHandlerContext) {
53+
self.connectionManager.channelActive(channel: context.channel)
54+
context.fireChannelActive()
55+
}
56+
57+
func channelInactive(context: ChannelHandlerContext) {
58+
self.scheduledIdle?.cancel()
59+
self.connectionManager.channelInactive()
60+
context.fireChannelInactive()
61+
}
62+
63+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
64+
let frame = self.unwrapInboundIn(data)
65+
66+
if frame.streamID == .rootStream {
67+
switch frame.payload {
68+
case .settings where !self.isReady:
69+
self.isReady = true
70+
71+
let remoteAddressDescription = context.channel.remoteAddress.map { "\($0)" } ?? "n/a"
72+
self.connectionManager.logger.info("gRPC connection ready", metadata: [
73+
"remote_address": "\(remoteAddressDescription)",
74+
"event_loop": "\(context.eventLoop)"
75+
])
76+
77+
// Start the idle timeout.
78+
self.scheduledIdle = context.eventLoop.scheduleTask(in: self.idleTimeout) {
79+
self.idle(context: context)
80+
}
81+
82+
self.connectionManager.ready()
83+
84+
case .goAway where self.activeStreams == 0:
85+
self.idle(context: context)
86+
87+
default:
88+
()
89+
}
90+
}
91+
92+
context.fireChannelRead(data)
93+
}
94+
95+
private func idle(context: ChannelHandlerContext) {
96+
assert(self.activeStreams == 0)
97+
self.connectionManager.idle()
98+
context.close(mode: .all, promise: nil)
99+
}
100+
}

0 commit comments

Comments
 (0)