16
16
import NIO
17
17
import NIOHTTP2
18
18
19
- internal class ClientConnectivityHandler : ChannelInboundHandler {
19
+ internal class GRPCIdleHandler : ChannelInboundHandler {
20
20
typealias InboundIn = HTTP2Frame
21
21
22
- private var connectionManager : ConnectionManager
22
+ /// The amount of time to wait before closing the channel when there are no active streams.
23
23
private let idleTimeout : TimeAmount
24
24
25
+ /// The number of active streams.
25
26
private var activeStreams = 0
27
+
28
+ /// The scheduled task which will close the channel.
26
29
private var scheduledIdle : Scheduled < Void > ? = nil
30
+
31
+ /// Client and server have slightly different behaviours; track which we are following.
32
+ private var mode : Mode
33
+
34
+ /// The mode of operation: the client tracks additional connection state in the connection
35
+ /// manager.
36
+ internal enum Mode {
37
+ case client( ConnectionManager )
38
+ case server
39
+ }
40
+
41
+ /// The current connection state.
27
42
private var state : State = . notReady
28
43
29
44
private enum State {
@@ -37,8 +52,8 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
37
52
case closed
38
53
}
39
54
40
- init ( connectionManager : ConnectionManager , idleTimeout: TimeAmount = . minutes( 5 ) ) {
41
- self . connectionManager = connectionManager
55
+ init ( mode : Mode , idleTimeout: TimeAmount = . minutes( 5 ) ) {
56
+ self . mode = mode
42
57
self . idleTimeout = idleTimeout
43
58
}
44
59
@@ -67,58 +82,74 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
67
82
}
68
83
69
84
func channelActive( context: ChannelHandlerContext ) {
70
- switch self . state {
71
- case . notReady:
72
- self . connectionManager. channelActive ( channel: context. channel)
73
- case . ready, . closed:
85
+ switch ( self . mode, self . state) {
86
+ case ( . client( let manager) , . notReady) :
87
+ manager. channelActive ( channel: context. channel)
88
+
89
+ case ( . server, . notReady) ,
90
+ ( _, . ready) ,
91
+ ( _, . closed) :
74
92
( )
75
93
}
76
94
95
+ self . scheduleIdleTimeout ( context: context)
77
96
context. fireChannelActive ( )
78
97
}
79
98
80
99
func channelInactive( context: ChannelHandlerContext ) {
81
100
self . scheduledIdle? . cancel ( )
82
101
self . scheduledIdle = nil
83
102
84
- switch self . state {
85
- case . notReady, . ready:
86
- self . connectionManager. channelInactive ( )
87
- case . closed:
103
+ switch ( self . mode, self . state) {
104
+ case ( . client( let manager) , . notReady) ,
105
+ ( . client( let manager) , . ready) :
106
+ manager. channelInactive ( )
107
+
108
+ case ( . server, . notReady) ,
109
+ ( . server, . ready) ,
110
+ ( _, . closed) :
88
111
( )
112
+
89
113
}
90
114
91
115
context. fireChannelInactive ( )
92
116
}
93
117
94
118
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
95
- let frame = self . unwrapInboundIn ( data)
96
-
97
- if frame. streamID == . rootStream {
98
- switch ( self . state, frame. payload) {
99
- // We only care about SETTINGS as long as we are in state `.notReady`.
100
- case ( . notReady, . settings) :
101
- self . state = . ready
102
-
103
- let remoteAddressDescription = context. channel. remoteAddress. map { " \( $0) " } ?? " n/a "
104
- self . connectionManager. logger. info ( " gRPC connection ready " , metadata: [
105
- " remote_address " : " \( remoteAddressDescription) " ,
106
- " event_loop " : " \( context. eventLoop) "
107
- ] )
108
-
109
- // Start the idle timeout.
110
- self . scheduleIdleTimeout ( context: context)
119
+ switch self . mode {
120
+ // The client has some connection state transitions we need to deal with here.
121
+ case . client( let manager) :
122
+ let frame = self . unwrapInboundIn ( data)
123
+
124
+ if frame. streamID == . rootStream {
125
+ switch ( self . state, frame. payload) {
126
+ // We only care about SETTINGS as long as we are in state `.notReady`.
127
+ case ( . notReady, . settings) :
128
+ self . state = . ready
129
+
130
+ let remoteAddressDescription = context. channel. remoteAddress. map { " \( $0) " } ?? " n/a "
131
+ manager. logger. info ( " gRPC connection ready " , metadata: [
132
+ " remote_address " : " \( remoteAddressDescription) " ,
133
+ " event_loop " : " \( context. eventLoop) "
134
+ ] )
135
+
136
+ // Start the idle timeout.
137
+ self . scheduleIdleTimeout ( context: context)
111
138
112
- // Let the manager know we're ready.
113
- self . connectionManager . ready ( )
139
+ // Let the manager know we're ready.
140
+ manager . ready ( )
114
141
115
- case ( . notReady, . goAway) ,
116
- ( . ready, . goAway) :
117
- self . idle ( context: context)
142
+ case ( . notReady, . goAway) ,
143
+ ( . ready, . goAway) :
144
+ self . idle ( context: context)
118
145
119
- default :
120
- ( )
146
+ default :
147
+ ( )
148
+ }
121
149
}
150
+
151
+ case . server:
152
+ ( )
122
153
}
123
154
124
155
context. fireChannelRead ( data)
@@ -140,7 +171,13 @@ internal class ClientConnectivityHandler: ChannelInboundHandler {
140
171
}
141
172
142
173
self . state = . closed
143
- self . connectionManager. idle ( )
174
+ switch self . mode {
175
+ case . client( let manager) :
176
+ manager. idle ( )
177
+ case . server:
178
+ ( )
179
+ }
180
+
144
181
context. close ( mode: . all, promise: nil )
145
182
}
146
183
}
0 commit comments