Skip to content

Commit

Permalink
[FAB-8075] Peer Deliver client doesn't failover to OSN
Browse files Browse the repository at this point in the history
If the peer is connected to a specific ordering node, and no
blocks are received within a timely manner, when that ordering
node loses connection - the peer gives up connecting to a different
ordering node.

This is because the total elapsed time includes the time waiting
on the Recv() and it instead should only include the time
that is spent while sleeping because of the exponential backoff.

Change-Id: I4a75dad1f1fa2facf70498a3d9692598b89e8a44
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Feb 6, 2018
1 parent ec639a6 commit 8701fad
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
9 changes: 5 additions & 4 deletions core/deliverservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,20 @@ func (bc *broadcastClient) Send(msg *common.Envelope) error {

func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) {
attempt := 0
start := time.Now()
var totalRetryTime time.Duration
var backoffDuration time.Duration
retry := true
for retry && !bc.shouldStop() {
attempt++
resp, err := bc.doAction(action)
if err != nil {
backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start))
backoffDuration, retry = bc.shouldRetry(attempt, totalRetryTime)
if !retry {
logger.Warning("Got error:", err, "at", attempt, "attempt. Ceasing to retry")
break
}
logger.Warning("Got error:", err, ",at", attempt, "attempt. Retrying in", backoffDuration)
logger.Warning("Got error:", err, ", at", attempt, "attempt. Retrying in", backoffDuration)
totalRetryTime += backoffDuration
bc.sleep(backoffDuration)
continue
}
Expand All @@ -101,7 +102,7 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{},
if bc.shouldStop() {
return nil, errors.New("Client is closing")
}
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start))
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, totalRetryTime)
}

func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/deliverservice/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,13 @@ func testLimitedTotalConnTime(t *testing.T, bdc blocksDelivererConsumer) {
return nil
}
backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
return 0, elapsedTime.Nanoseconds() < time.Second.Nanoseconds()
return time.Millisecond * 500, elapsedTime.Nanoseconds() < time.Second.Nanoseconds()
}
bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy)
defer bc.Close()
err := bdc(bc)
assert.Error(t, err)
assert.Equal(t, 1, cp.connAttempts)
assert.Equal(t, 3, cp.connAttempts)
assert.Equal(t, 0, setupInvoked)
}

Expand Down
51 changes: 51 additions & 0 deletions core/deliverservice/deliveryclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -476,6 +477,56 @@ func TestDeliverServiceShutdown(t *testing.T) {
time.Sleep(time.Second)
}

func TestDeliverServiceShutdownRespawn(t *testing.T) {
// Scenario: Launch an ordering service node and let the client pull some blocks.
// Then, wait a few seconds, and don't send any blocks.
// Afterwards - start a new instance and shut down the old instance.
viper.Set("peer.deliveryclient.reconnectTotalTimeThreshold", time.Second)
defer func() {
viper.Reset()
}()
defer ensureNoGoroutineLeak(t)()

osn1 := mocks.NewOrderer(5614, t)

time.Sleep(time.Second)
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)}

service, err := NewDeliverService(&Config{
Endpoints: []string{"localhost:5614", "localhost:5615"},
Gossip: gossipServiceAdapter,
CryptoSvc: &mockMCS{},
ABCFactory: DefaultABCFactory,
ConnFactory: DefaultConnectionFactory,
})
assert.NoError(t, err)

li := &mocks.MockLedgerInfo{Height: uint64(100)}
osn1.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")

// Check that delivery service requests blocks in order
go osn1.SendBlock(uint64(100))
assertBlockDissemination(100, gossipServiceAdapter.GossipBlockDisseminations, t)
go osn1.SendBlock(uint64(101))
assertBlockDissemination(101, gossipServiceAdapter.GossipBlockDisseminations, t)
atomic.StoreUint64(&li.Height, uint64(102))
// Now wait for a few seconds
time.Sleep(time.Second * 2)
// Now start the new instance
osn2 := mocks.NewOrderer(5615, t)
// Now stop the old instance
osn1.Shutdown()
// Send a block from osn2
osn2.SetNextExpectedSeek(uint64(102))
go osn2.SendBlock(uint64(102))
// Ensure it is received
assertBlockDissemination(102, gossipServiceAdapter.GossipBlockDisseminations, t)
service.Stop()
osn2.Shutdown()
}

func TestDeliverServiceBadConfig(t *testing.T) {
// Empty endpoints
service, err := NewDeliverService(&Config{
Expand Down

0 comments on commit 8701fad

Please sign in to comment.