@@ -10,6 +10,7 @@ import (
10
10
"context"
11
11
"io"
12
12
"math"
13
+ "strconv"
13
14
"time"
14
15
15
16
"github.com/golang/protobuf/proto"
@@ -89,6 +90,7 @@ type Handler struct {
89
90
ChainManager ChainManager
90
91
TimeWindow time.Duration
91
92
BindingInspector Inspector
93
+ Metrics * Metrics
92
94
}
93
95
94
96
//go:generate counterfeiter -o mock/receiver.go -fake-name Receiver . Receiver
@@ -107,6 +109,12 @@ type ResponseSender interface {
107
109
SendBlockResponse (block * cb.Block ) error
108
110
}
109
111
112
+ // Filtered is a marker interface that indicates a response sender
113
+ // is configured to send filtered blocks
114
+ type Filtered interface {
115
+ IsFiltered () bool
116
+ }
117
+
110
118
// Server is a polymorphic structure to support generalization of this handler
111
119
// to be able to deliver different type of responses.
112
120
type Server struct {
@@ -125,101 +133,129 @@ func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
125
133
}
126
134
127
135
// NewHandler creates an implementation of the Handler interface.
128
- func NewHandler (cm ChainManager , timeWindow time.Duration , mutualTLS bool ) * Handler {
136
+ func NewHandler (cm ChainManager , timeWindow time.Duration , mutualTLS bool , metrics * Metrics ) * Handler {
129
137
return & Handler {
130
138
ChainManager : cm ,
131
139
TimeWindow : timeWindow ,
132
140
BindingInspector : InspectorFunc (comm .NewBindingInspector (mutualTLS , ExtractChannelHeaderCertHash )),
141
+ Metrics : metrics ,
133
142
}
134
143
}
135
144
136
145
// Handle receives incoming deliver requests.
137
146
func (h * Handler ) Handle (ctx context.Context , srv * Server ) error {
138
147
addr := util .ExtractRemoteAddress (ctx )
139
148
logger .Debugf ("Starting new deliver loop for %s" , addr )
149
+ h .Metrics .StreamsOpened .Add (1 )
150
+ defer h .Metrics .StreamsClosed .Add (1 )
140
151
for {
141
152
logger .Debugf ("Attempting to read seek info message from %s" , addr )
142
153
envelope , err := srv .Recv ()
143
154
if err == io .EOF {
144
155
logger .Debugf ("Received EOF from %s, hangup" , addr )
145
156
return nil
146
157
}
147
-
148
158
if err != nil {
149
159
logger .Warningf ("Error reading from %s: %s" , addr , err )
150
160
return err
151
161
}
152
162
153
- if err := h .deliverBlocks (ctx , srv , envelope ); err != nil {
163
+ status , err := h .deliverBlocks (ctx , srv , envelope )
164
+ if err != nil {
165
+ return err
166
+ }
167
+
168
+ err = srv .SendStatusResponse (status )
169
+ if status != cb .Status_SUCCESS {
170
+ return err
171
+ }
172
+ if err != nil {
173
+ logger .Warningf ("Error sending to %s: %s" , addr , err )
154
174
return err
155
175
}
156
176
157
177
logger .Debugf ("Waiting for new SeekInfo from %s" , addr )
158
178
}
159
179
}
160
180
161
- func (h * Handler ) deliverBlocks (ctx context.Context , srv * Server , envelope * cb.Envelope ) error {
181
+ func isFiltered (srv * Server ) bool {
182
+ if filtered , ok := srv .ResponseSender .(Filtered ); ok {
183
+ return filtered .IsFiltered ()
184
+ }
185
+ return false
186
+ }
187
+
188
+ func (h * Handler ) deliverBlocks (ctx context.Context , srv * Server , envelope * cb.Envelope ) (status cb.Status , err error ) {
162
189
addr := util .ExtractRemoteAddress (ctx )
163
190
payload , err := utils .UnmarshalPayload (envelope .Payload )
164
191
if err != nil {
165
192
logger .Warningf ("Received an envelope from %s with no payload: %s" , addr , err )
166
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
193
+ return cb .Status_BAD_REQUEST , nil
167
194
}
168
195
169
196
if payload .Header == nil {
170
197
logger .Warningf ("Malformed envelope received from %s with bad header" , addr )
171
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
198
+ return cb .Status_BAD_REQUEST , nil
172
199
}
173
200
174
201
chdr , err := utils .UnmarshalChannelHeader (payload .Header .ChannelHeader )
175
202
if err != nil {
176
203
logger .Warningf ("Failed to unmarshal channel header from %s: %s" , addr , err )
177
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
204
+ return cb .Status_BAD_REQUEST , nil
178
205
}
179
206
180
207
err = h .validateChannelHeader (ctx , chdr )
181
208
if err != nil {
182
209
logger .Warningf ("Rejecting deliver for %s due to envelope validation error: %s" , addr , err )
183
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
210
+ return cb .Status_BAD_REQUEST , nil
184
211
}
185
212
186
213
chain := h .ChainManager .GetChain (chdr .ChannelId )
187
214
if chain == nil {
188
215
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
189
216
// So we would expect our log to be somewhat flooded with these
190
217
logger .Debugf ("Rejecting deliver for %s because channel %s not found" , addr , chdr .ChannelId )
191
- return srv .SendStatusResponse (cb .Status_NOT_FOUND )
218
+ return cb .Status_NOT_FOUND , nil
219
+ }
220
+
221
+ labels := []string {
222
+ "channel" , chdr .ChannelId ,
223
+ "filtered" , strconv .FormatBool (isFiltered (srv )),
192
224
}
225
+ h .Metrics .RequestsReceived .With (labels ... ).Add (1 )
226
+ defer func () {
227
+ labels := append (labels , "success" , strconv .FormatBool (status == cb .Status_SUCCESS ))
228
+ h .Metrics .RequestsCompleted .With (labels ... ).Add (1 )
229
+ }()
193
230
194
231
erroredChan := chain .Errored ()
195
232
select {
196
233
case <- erroredChan :
197
234
logger .Warningf ("[channel: %s] Rejecting deliver request for %s because of consenter error" , chdr .ChannelId , addr )
198
- return srv . SendStatusResponse ( cb .Status_SERVICE_UNAVAILABLE )
235
+ return cb .Status_SERVICE_UNAVAILABLE , nil
199
236
default :
200
-
201
237
}
202
238
203
239
accessControl , err := NewSessionAC (chain , envelope , srv .PolicyChecker , chdr .ChannelId , crypto .ExpiresAt )
204
240
if err != nil {
205
241
logger .Warningf ("[channel: %s] failed to create access control object due to %s" , chdr .ChannelId , err )
206
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
242
+ return cb .Status_BAD_REQUEST , nil
207
243
}
208
244
209
245
if err := accessControl .Evaluate (); err != nil {
210
246
logger .Warningf ("[channel: %s] Client authorization revoked for deliver request from %s: %s" , chdr .ChannelId , addr , err )
211
- return srv . SendStatusResponse ( cb .Status_FORBIDDEN )
247
+ return cb .Status_FORBIDDEN , nil
212
248
}
213
249
214
250
seekInfo := & ab.SeekInfo {}
215
251
if err = proto .Unmarshal (payload .Data , seekInfo ); err != nil {
216
252
logger .Warningf ("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s" , chdr .ChannelId , addr , err )
217
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
253
+ return cb .Status_BAD_REQUEST , nil
218
254
}
219
255
220
256
if seekInfo .Start == nil || seekInfo .Stop == nil {
221
257
logger .Warningf ("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v" , chdr .ChannelId , addr , seekInfo .Start , seekInfo .Stop )
222
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
258
+ return cb .Status_BAD_REQUEST , nil
223
259
}
224
260
225
261
logger .Debugf ("[channel: %s] Received seekInfo (%p) %v from %s" , chdr .ChannelId , seekInfo , seekInfo , addr )
@@ -236,14 +272,14 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
236
272
stopNum = stop .Specified .Number
237
273
if stopNum < number {
238
274
logger .Warningf ("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d" , chdr .ChannelId , addr , number , stopNum )
239
- return srv . SendStatusResponse ( cb .Status_BAD_REQUEST )
275
+ return cb .Status_BAD_REQUEST , nil
240
276
}
241
277
}
242
278
243
279
for {
244
280
if seekInfo .Behavior == ab .SeekInfo_FAIL_IF_NOT_READY {
245
281
if number > chain .Reader ().Height ()- 1 {
246
- return srv . SendStatusResponse ( cb .Status_NOT_FOUND )
282
+ return cb .Status_NOT_FOUND , nil
247
283
}
248
284
}
249
285
@@ -259,47 +295,44 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
259
295
select {
260
296
case <- ctx .Done ():
261
297
logger .Debugf ("Context canceled, aborting wait for next block" )
262
- return errors .Wrapf (ctx .Err (), "context finished before block retrieved" )
298
+ return cb . Status_INTERNAL_SERVER_ERROR , errors .Wrapf (ctx .Err (), "context finished before block retrieved" )
263
299
case <- erroredChan :
264
300
logger .Warningf ("Aborting deliver for request because of background error" )
265
- return srv . SendStatusResponse ( cb .Status_SERVICE_UNAVAILABLE )
301
+ return cb .Status_SERVICE_UNAVAILABLE , nil
266
302
case <- iterCh :
267
303
// Iterator has set the block and status vars
268
304
}
269
305
270
306
if status != cb .Status_SUCCESS {
271
307
logger .Errorf ("[channel: %s] Error reading from channel, cause was: %v" , chdr .ChannelId , status )
272
- return srv . SendStatusResponse ( status )
308
+ return status , nil
273
309
}
274
310
275
311
// increment block number to support FAIL_IF_NOT_READY deliver behavior
276
312
number ++
277
313
278
314
if err := accessControl .Evaluate (); err != nil {
279
315
logger .Warningf ("[channel: %s] Client authorization revoked for deliver request from %s: %s" , chdr .ChannelId , addr , err )
280
- return srv . SendStatusResponse ( cb .Status_FORBIDDEN )
316
+ return cb .Status_FORBIDDEN , nil
281
317
}
282
318
283
319
logger .Debugf ("[channel: %s] Delivering block for (%p) for %s" , chdr .ChannelId , seekInfo , addr )
284
320
285
321
if err := srv .SendBlockResponse (block ); err != nil {
286
322
logger .Warningf ("[channel: %s] Error sending to %s: %s" , chdr .ChannelId , addr , err )
287
- return err
323
+ return cb . Status_INTERNAL_SERVER_ERROR , err
288
324
}
289
325
326
+ h .Metrics .BlocksSent .With (labels ... ).Add (1 )
327
+
290
328
if stopNum == block .Header .Number {
291
329
break
292
330
}
293
331
}
294
332
295
- if err := srv .SendStatusResponse (cb .Status_SUCCESS ); err != nil {
296
- logger .Warningf ("[channel: %s] Error sending to %s: %s" , chdr .ChannelId , addr , err )
297
- return err
298
- }
299
-
300
333
logger .Debugf ("[channel: %s] Done delivering to %s for (%p)" , chdr .ChannelId , addr , seekInfo )
301
334
302
- return nil
335
+ return cb . Status_SUCCESS , nil
303
336
}
304
337
305
338
func (h * Handler ) validateChannelHeader (ctx context.Context , chdr * cb.ChannelHeader ) error {
0 commit comments