Skip to content

Commit 13927b0

Browse files
wlahtisykesm
authored andcommitted
Instrument deliver service
This CR adds metrics for streams opened/closed, requests received/completed, and blocks sent. FAB-9570 #done Change-Id: I8ffc8c1107bbe31dd0b1e7f8447e8424363ca14d Signed-off-by: Will Lahti <wtlahti@us.ibm.com> Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent 73d1917 commit 13927b0

18 files changed

+1079
-399
lines changed

common/deliver/deliver.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"io"
1212
"math"
13+
"strconv"
1314
"time"
1415

1516
"github.com/golang/protobuf/proto"
@@ -89,6 +90,7 @@ type Handler struct {
8990
ChainManager ChainManager
9091
TimeWindow time.Duration
9192
BindingInspector Inspector
93+
Metrics *Metrics
9294
}
9395

9496
//go:generate counterfeiter -o mock/receiver.go -fake-name Receiver . Receiver
@@ -107,6 +109,12 @@ type ResponseSender interface {
107109
SendBlockResponse(block *cb.Block) error
108110
}
109111

112+
// Filtered is a marker interface that indicates a response sender
113+
// is configured to send filtered blocks
114+
type Filtered interface {
115+
IsFiltered() bool
116+
}
117+
110118
// Server is a polymorphic structure to support generalization of this handler
111119
// to be able to deliver different type of responses.
112120
type Server struct {
@@ -125,101 +133,129 @@ func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
125133
}
126134

127135
// NewHandler creates an implementation of the Handler interface.
128-
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool) *Handler {
136+
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics) *Handler {
129137
return &Handler{
130138
ChainManager: cm,
131139
TimeWindow: timeWindow,
132140
BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
141+
Metrics: metrics,
133142
}
134143
}
135144

136145
// Handle receives incoming deliver requests.
137146
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
138147
addr := util.ExtractRemoteAddress(ctx)
139148
logger.Debugf("Starting new deliver loop for %s", addr)
149+
h.Metrics.StreamsOpened.Add(1)
150+
defer h.Metrics.StreamsClosed.Add(1)
140151
for {
141152
logger.Debugf("Attempting to read seek info message from %s", addr)
142153
envelope, err := srv.Recv()
143154
if err == io.EOF {
144155
logger.Debugf("Received EOF from %s, hangup", addr)
145156
return nil
146157
}
147-
148158
if err != nil {
149159
logger.Warningf("Error reading from %s: %s", addr, err)
150160
return err
151161
}
152162

153-
if err := h.deliverBlocks(ctx, srv, envelope); err != nil {
163+
status, err := h.deliverBlocks(ctx, srv, envelope)
164+
if err != nil {
165+
return err
166+
}
167+
168+
err = srv.SendStatusResponse(status)
169+
if status != cb.Status_SUCCESS {
170+
return err
171+
}
172+
if err != nil {
173+
logger.Warningf("Error sending to %s: %s", addr, err)
154174
return err
155175
}
156176

157177
logger.Debugf("Waiting for new SeekInfo from %s", addr)
158178
}
159179
}
160180

161-
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) error {
181+
func isFiltered(srv *Server) bool {
182+
if filtered, ok := srv.ResponseSender.(Filtered); ok {
183+
return filtered.IsFiltered()
184+
}
185+
return false
186+
}
187+
188+
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
162189
addr := util.ExtractRemoteAddress(ctx)
163190
payload, err := utils.UnmarshalPayload(envelope.Payload)
164191
if err != nil {
165192
logger.Warningf("Received an envelope from %s with no payload: %s", addr, err)
166-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
193+
return cb.Status_BAD_REQUEST, nil
167194
}
168195

169196
if payload.Header == nil {
170197
logger.Warningf("Malformed envelope received from %s with bad header", addr)
171-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
198+
return cb.Status_BAD_REQUEST, nil
172199
}
173200

174201
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
175202
if err != nil {
176203
logger.Warningf("Failed to unmarshal channel header from %s: %s", addr, err)
177-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
204+
return cb.Status_BAD_REQUEST, nil
178205
}
179206

180207
err = h.validateChannelHeader(ctx, chdr)
181208
if err != nil {
182209
logger.Warningf("Rejecting deliver for %s due to envelope validation error: %s", addr, err)
183-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
210+
return cb.Status_BAD_REQUEST, nil
184211
}
185212

186213
chain := h.ChainManager.GetChain(chdr.ChannelId)
187214
if chain == nil {
188215
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
189216
// So we would expect our log to be somewhat flooded with these
190217
logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
191-
return srv.SendStatusResponse(cb.Status_NOT_FOUND)
218+
return cb.Status_NOT_FOUND, nil
219+
}
220+
221+
labels := []string{
222+
"channel", chdr.ChannelId,
223+
"filtered", strconv.FormatBool(isFiltered(srv)),
192224
}
225+
h.Metrics.RequestsReceived.With(labels...).Add(1)
226+
defer func() {
227+
labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
228+
h.Metrics.RequestsCompleted.With(labels...).Add(1)
229+
}()
193230

194231
erroredChan := chain.Errored()
195232
select {
196233
case <-erroredChan:
197234
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
198-
return srv.SendStatusResponse(cb.Status_SERVICE_UNAVAILABLE)
235+
return cb.Status_SERVICE_UNAVAILABLE, nil
199236
default:
200-
201237
}
202238

203239
accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
204240
if err != nil {
205241
logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
206-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
242+
return cb.Status_BAD_REQUEST, nil
207243
}
208244

209245
if err := accessControl.Evaluate(); err != nil {
210246
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
211-
return srv.SendStatusResponse(cb.Status_FORBIDDEN)
247+
return cb.Status_FORBIDDEN, nil
212248
}
213249

214250
seekInfo := &ab.SeekInfo{}
215251
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
216252
logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
217-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
253+
return cb.Status_BAD_REQUEST, nil
218254
}
219255

220256
if seekInfo.Start == nil || seekInfo.Stop == nil {
221257
logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
222-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
258+
return cb.Status_BAD_REQUEST, nil
223259
}
224260

225261
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
@@ -236,14 +272,14 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
236272
stopNum = stop.Specified.Number
237273
if stopNum < number {
238274
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
239-
return srv.SendStatusResponse(cb.Status_BAD_REQUEST)
275+
return cb.Status_BAD_REQUEST, nil
240276
}
241277
}
242278

243279
for {
244280
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
245281
if number > chain.Reader().Height()-1 {
246-
return srv.SendStatusResponse(cb.Status_NOT_FOUND)
282+
return cb.Status_NOT_FOUND, nil
247283
}
248284
}
249285

@@ -259,47 +295,44 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
259295
select {
260296
case <-ctx.Done():
261297
logger.Debugf("Context canceled, aborting wait for next block")
262-
return errors.Wrapf(ctx.Err(), "context finished before block retrieved")
298+
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
263299
case <-erroredChan:
264300
logger.Warningf("Aborting deliver for request because of background error")
265-
return srv.SendStatusResponse(cb.Status_SERVICE_UNAVAILABLE)
301+
return cb.Status_SERVICE_UNAVAILABLE, nil
266302
case <-iterCh:
267303
// Iterator has set the block and status vars
268304
}
269305

270306
if status != cb.Status_SUCCESS {
271307
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
272-
return srv.SendStatusResponse(status)
308+
return status, nil
273309
}
274310

275311
// increment block number to support FAIL_IF_NOT_READY deliver behavior
276312
number++
277313

278314
if err := accessControl.Evaluate(); err != nil {
279315
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
280-
return srv.SendStatusResponse(cb.Status_FORBIDDEN)
316+
return cb.Status_FORBIDDEN, nil
281317
}
282318

283319
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
284320

285321
if err := srv.SendBlockResponse(block); err != nil {
286322
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
287-
return err
323+
return cb.Status_INTERNAL_SERVER_ERROR, err
288324
}
289325

326+
h.Metrics.BlocksSent.With(labels...).Add(1)
327+
290328
if stopNum == block.Header.Number {
291329
break
292330
}
293331
}
294332

295-
if err := srv.SendStatusResponse(cb.Status_SUCCESS); err != nil {
296-
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
297-
return err
298-
}
299-
300333
logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
301334

302-
return nil
335+
return cb.Status_SUCCESS, nil
303336
}
304337

305338
func (h *Handler) validateChannelHeader(ctx context.Context, chdr *cb.ChannelHeader) error {

common/deliver/deliver_suite_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@ package deliver_test
99
import (
1010
"testing"
1111

12+
"github.com/hyperledger/fabric/common/deliver"
1213
. "github.com/onsi/ginkgo"
1314
. "github.com/onsi/gomega"
1415
)
1516

17+
//go:generate counterfeiter -o mock/filtered_response_sender.go -fake-name FilteredResponseSender . filteredResponseSender
18+
type filteredResponseSender interface {
19+
deliver.ResponseSender
20+
deliver.Filtered
21+
}
22+
1623
func TestDeliver(t *testing.T) {
1724
RegisterFailHandler(Fail)
1825
RunSpecs(t, "Deliver Suite")

0 commit comments

Comments
 (0)