Skip to content

Commit 3689c5a

Browse files
committed
[FAB-7306] delivery-client OSN black-listing fine-tune
When the peer receives a bad status (i.e UNAVAILABLE) from an ordering service node that it connects to, it disconnects from it and tries another ordering service node. FAB-5006 introduced an optimization for v1.1 that makes the peer temporarily black-list ordering service nodes that send a bad status in order for the peer to prioritize selection of other OSNs, instead of connecting to them with the same probability. The black-listing works via calling Disconnect() with a 'true' parameter. However - this might be problematic in cases where the OSNs are behind a LB/NAT where only 1 endpoint is actually known to the peer, but in practice - several endpoints exist behind. This would result in the peer always disabling immediately all OSNs because it only knows 1 endpoint. This change set overcomes this by avoiding black-listing an endpoint if it is the last one remaining. Change-Id: I59ead8af88eb9acff3d3c64227528b18b990fb00 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent d79520f commit 3689c5a

File tree

3 files changed

+39
-48
lines changed

3 files changed

+39
-48
lines changed

core/comm/producer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,13 @@ func (cp *connProducer) DisableEndpoint(endpoint string) {
108108
cp.Lock()
109109
defer cp.Unlock()
110110

111-
for i := range cp.endpoints {
112-
if cp.endpoints[i] == endpoint {
111+
if len(cp.endpoints)-len(cp.disabledEndpoints) == 1 {
112+
logger.Warning("Only 1 endpoint remained, will not black-list it")
113+
return
114+
}
115+
116+
for _, currEndpoint := range cp.endpoints {
117+
if currEndpoint == endpoint {
113118
cp.disabledEndpoints[endpoint] = time.Now()
114119
break
115120
}

core/comm/producer_test.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@ import (
1515
"google.golang.org/grpc"
1616
)
1717

18-
type connMock struct {
19-
*grpc.ClientConn
20-
endpoint string
21-
}
22-
2318
func TestEmptyEndpoints(t *testing.T) {
2419
noopFactory := func(endpoint string) (*grpc.ClientConn, error) {
2520
return nil, nil
@@ -121,20 +116,15 @@ func TestDisableEndpoint(t *testing.T) {
121116
// Now disable endpoint for 100 milliseconds
122117
producer.DisableEndpoint("a")
123118
_, _, err = producer.NewConnection()
124-
assert.Error(t, err, "Could not connect")
125-
// Wait until disable expire and try to connect again
126-
time.Sleep(time.Millisecond * 200)
127-
conn, a, err = producer.NewConnection()
119+
// Make sure if only 1 endpoint remains, we don't black-list it
128120
assert.NoError(t, err)
129-
assert.Equal(t, "a", conn2Endpoint[fmt.Sprintf("%p", conn)])
130-
assert.Equal(t, "a", a)
121+
// Update endpoints - add endpoint 'b'
122+
producer.UpdateEndpoints([]string{"a", "b"})
131123
// Disable again
132124
producer.DisableEndpoint("a")
133-
// Update endpoints
134-
producer.UpdateEndpoints([]string{"a", "b"})
135-
136125
conn, a, err = producer.NewConnection()
137126
assert.NoError(t, err)
127+
// Ensure that only b is returned because 'a' is disabled
138128
assert.Equal(t, "b", conn2Endpoint[fmt.Sprintf("%p", conn)])
139129
assert.Equal(t, "b", a)
140130

core/deliverservice/client_test.go

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -675,21 +675,21 @@ func TestDisconnect(t *testing.T) {
675675

676676
func TestDisconnectAndDisableEndpoint(t *testing.T) {
677677
// Scenario:
678-
// Start one ordering service and one client
679-
// Connect client to ordering service endpoint
680-
// Check connection to ordering service
681-
// Disconnect and disable endpoint
682-
// Check that ordering service don't have connection to the client
683-
// Try to reconnect to endpoint (orderer) and check that
684-
// ordering service still don't have connection to the client
685-
// Wait until endpoint disable expired and reconnect again
686-
// Check that we have connection between orderer and client
678+
// 1) Start two ordering service nodes and one client
679+
// 2) Have the client connect to some ordering service node
680+
// 3) Disconnect and disable the endpoint of the current connection,
681+
// and ensure the client connects to the other node.
682+
// 4) Black-list the second connection and ensure it still connects
683+
// to the ordering service node because it's the last one remaining.
687684

688685
defer ensureNoGoroutineLeak(t)()
689-
os := mocks.NewOrderer(5613, t)
690-
os.SetNextExpectedSeek(5)
686+
os1 := mocks.NewOrderer(5613, t)
687+
os1.SetNextExpectedSeek(5)
688+
os2 := mocks.NewOrderer(5614, t)
689+
os2.SetNextExpectedSeek(5)
691690

692-
defer os.Shutdown()
691+
defer os1.Shutdown()
692+
defer os2.Shutdown()
693693

694694
orgEndpointDisableInterval := comm.EndpointDisableInterval
695695
comm.EndpointDisableInterval = time.Millisecond * 1500
@@ -698,7 +698,7 @@ func TestDisconnectAndDisableEndpoint(t *testing.T) {
698698
connFact := func(endpoint string) (*grpc.ClientConn, error) {
699699
return grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock())
700700
}
701-
prod := comm.NewConnectionProducer(connFact, []string{"localhost:5613"})
701+
prod := comm.NewConnectionProducer(connFact, []string{"localhost:5613", "localhost:5614"})
702702
clFact := func(cc *grpc.ClientConn) orderer.AtomicBroadcastClient {
703703
return orderer.NewAtomicBroadcastClient(cc)
704704
}
@@ -711,45 +711,41 @@ func TestDisconnectAndDisableEndpoint(t *testing.T) {
711711
}
712712

713713
cl := NewBroadcastClient(prod, clFact, onConnect, retryPol)
714+
defer cl.Close()
714715

715716
// First connect to orderer
716717
go func() {
717718
cl.Recv()
718719
}()
719720

720721
assert.True(t, waitForWithTimeout(time.Millisecond*100, func() bool {
721-
return os.ConnCount() == 1
722+
return os1.ConnCount() == 1 || os2.ConnCount() == 1
722723
}), "Didn't get connection to orderer")
723724

725+
connectedToOS1 := os1.ConnCount() == 1
726+
724727
// Disconnect and disable endpoint
725728
cl.Disconnect(true)
726729

730+
// Ensure we reconnected to the other node
727731
assert.True(t, waitForWithTimeout(time.Millisecond*100, func() bool {
728-
return os.ConnCount() == 0
729-
}), "Didn't disconnect from orderer")
730-
731-
// Try to reconnect while endpoint still disabled
732-
go func() {
733-
assert.False(t, waitForWithTimeout(time.Millisecond*100, func() bool {
734-
return os.ConnCount() == 1
735-
}), "Recreated connection to orderer, although shouldn't")
736-
}()
737-
738-
_, err := cl.Recv()
739-
assert.Error(t, err, "Connection shouldn't have been established because all endpoints have been disabled")
732+
if connectedToOS1 {
733+
return os1.ConnCount() == 0 && os2.ConnCount() == 1
734+
}
735+
return os2.ConnCount() == 0 && os1.ConnCount() == 1
736+
}), "Didn't disconnect from orderer, or reconnected to a black-listed node")
740737

741-
//Wait until endpoint disable expires and reconnect again
742-
time.Sleep(time.Millisecond * 1500)
738+
// Disconnect from the node we are currently connected to, and attempt to black-list it
739+
cl.Disconnect(true)
743740

744741
go func() {
745742
cl.Recv()
746743
}()
747744

745+
// Ensure we are still connected to some orderer, even though both endpoints are now black-listed
748746
assert.True(t, waitForWithTimeout(time.Millisecond*100, func() bool {
749-
return os.ConnCount() == 1
750-
}), "Didn't got connection to orderer after endpoint disable expired")
751-
752-
cl.Close()
747+
return os1.ConnCount() == 1 || os2.ConnCount() == 1
748+
}), "Didn't got connection to orderer")
753749
}
754750

755751
func waitForWithTimeout(timeout time.Duration, f func() bool) bool {

0 commit comments

Comments
 (0)