Skip to content

Commit 8701fad

Browse files
committed
[FAB-8075] Peer Deliver client doesn't failover to OSN
If the peer is connected to a specific ordering node, and no blocks are received within a timely manner, when that ordering node loses connection - the peer gives up connecting to a different ordering node. This is because the total elapsed time includes the time waiting on the Recv() and it instead should only include the time that is spent while sleeping because of the exponential backoff. Change-Id: I4a75dad1f1fa2facf70498a3d9692598b89e8a44 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent ec639a6 commit 8701fad

File tree

3 files changed

+58
-6
lines changed

3 files changed

+58
-6
lines changed

core/deliverservice/client.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,20 @@ func (bc *broadcastClient) Send(msg *common.Envelope) error {
8080

8181
func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) {
8282
attempt := 0
83-
start := time.Now()
83+
var totalRetryTime time.Duration
8484
var backoffDuration time.Duration
8585
retry := true
8686
for retry && !bc.shouldStop() {
8787
attempt++
8888
resp, err := bc.doAction(action)
8989
if err != nil {
90-
backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start))
90+
backoffDuration, retry = bc.shouldRetry(attempt, totalRetryTime)
9191
if !retry {
9292
logger.Warning("Got error:", err, "at", attempt, "attempt. Ceasing to retry")
9393
break
9494
}
95-
logger.Warning("Got error:", err, ",at", attempt, "attempt. Retrying in", backoffDuration)
95+
logger.Warning("Got error:", err, ", at", attempt, "attempt. Retrying in", backoffDuration)
96+
totalRetryTime += backoffDuration
9697
bc.sleep(backoffDuration)
9798
continue
9899
}
@@ -101,7 +102,7 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{},
101102
if bc.shouldStop() {
102103
return nil, errors.New("Client is closing")
103104
}
104-
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start))
105+
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, totalRetryTime)
105106
}
106107

107108
func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {

core/deliverservice/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,13 +428,13 @@ func testLimitedTotalConnTime(t *testing.T, bdc blocksDelivererConsumer) {
428428
return nil
429429
}
430430
backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
431-
return 0, elapsedTime.Nanoseconds() < time.Second.Nanoseconds()
431+
return time.Millisecond * 500, elapsedTime.Nanoseconds() < time.Second.Nanoseconds()
432432
}
433433
bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy)
434434
defer bc.Close()
435435
err := bdc(bc)
436436
assert.Error(t, err)
437-
assert.Equal(t, 1, cp.connAttempts)
437+
assert.Equal(t, 3, cp.connAttempts)
438438
assert.Equal(t, 0, setupInvoked)
439439
}
440440

core/deliverservice/deliveryclient_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/hyperledger/fabric/gossip/common"
2424
"github.com/hyperledger/fabric/msp/mgmt/testtools"
2525
"github.com/hyperledger/fabric/protos/orderer"
26+
"github.com/spf13/viper"
2627
"github.com/stretchr/testify/assert"
2728
"google.golang.org/grpc"
2829
)
@@ -476,6 +477,56 @@ func TestDeliverServiceShutdown(t *testing.T) {
476477
time.Sleep(time.Second)
477478
}
478479

480+
func TestDeliverServiceShutdownRespawn(t *testing.T) {
481+
// Scenario: Launch an ordering service node and let the client pull some blocks.
482+
// Then, wait a few seconds, and don't send any blocks.
483+
// Afterwards - start a new instance and shut down the old instance.
484+
viper.Set("peer.deliveryclient.reconnectTotalTimeThreshold", time.Second)
485+
defer func() {
486+
viper.Reset()
487+
}()
488+
defer ensureNoGoroutineLeak(t)()
489+
490+
osn1 := mocks.NewOrderer(5614, t)
491+
492+
time.Sleep(time.Second)
493+
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)}
494+
495+
service, err := NewDeliverService(&Config{
496+
Endpoints: []string{"localhost:5614", "localhost:5615"},
497+
Gossip: gossipServiceAdapter,
498+
CryptoSvc: &mockMCS{},
499+
ABCFactory: DefaultABCFactory,
500+
ConnFactory: DefaultConnectionFactory,
501+
})
502+
assert.NoError(t, err)
503+
504+
li := &mocks.MockLedgerInfo{Height: uint64(100)}
505+
osn1.SetNextExpectedSeek(uint64(100))
506+
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
507+
assert.NoError(t, err, "can't start delivery")
508+
509+
// Check that delivery service requests blocks in order
510+
go osn1.SendBlock(uint64(100))
511+
assertBlockDissemination(100, gossipServiceAdapter.GossipBlockDisseminations, t)
512+
go osn1.SendBlock(uint64(101))
513+
assertBlockDissemination(101, gossipServiceAdapter.GossipBlockDisseminations, t)
514+
atomic.StoreUint64(&li.Height, uint64(102))
515+
// Now wait for a few seconds
516+
time.Sleep(time.Second * 2)
517+
// Now start the new instance
518+
osn2 := mocks.NewOrderer(5615, t)
519+
// Now stop the old instance
520+
osn1.Shutdown()
521+
// Send a block from osn2
522+
osn2.SetNextExpectedSeek(uint64(102))
523+
go osn2.SendBlock(uint64(102))
524+
// Ensure it is received
525+
assertBlockDissemination(102, gossipServiceAdapter.GossipBlockDisseminations, t)
526+
service.Stop()
527+
osn2.Shutdown()
528+
}
529+
479530
func TestDeliverServiceBadConfig(t *testing.T) {
480531
// Empty endpoints
481532
service, err := NewDeliverService(&Config{

0 commit comments

Comments
 (0)