Skip to content

Commit a444aa6

Browse files
committed
[FAB-10949] fix race in TestResubmission
The test was beginning to marshal an object that had not been fully unmarshaled from the wire. Change-Id: I6f6b019405197d556459f26e1637cfa03e2dc893 Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent 635bce5 commit a444aa6

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

orderer/consensus/kafka/chain_test.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/hyperledger/fabric/protos/utils"
2626
"github.com/stretchr/testify/assert"
2727
"github.com/stretchr/testify/mock"
28+
"github.com/stretchr/testify/require"
2829
)
2930

3031
var (
@@ -2464,9 +2465,12 @@ func TestResubmission(t *testing.T) {
24642465
}
24652466
defer close(mockSupport.BlockCutterVal.Block)
24662467

2467-
expectedKafkaMsg := &ab.KafkaMessage{}
2468+
expectedKafkaMsgCh := make(chan *ab.KafkaMessage, 1)
24682469
producer := mocks.NewSyncProducer(t, mockBrokerConfig)
24692470
producer.ExpectSendMessageWithCheckerFunctionAndSucceed(func(val []byte) error {
2471+
defer close(expectedKafkaMsgCh)
2472+
2473+
expectedKafkaMsg := &ab.KafkaMessage{}
24702474
if err := proto.Unmarshal(val, expectedKafkaMsg); err != nil {
24712475
return err
24722476
}
@@ -2484,6 +2488,7 @@ func TestResubmission(t *testing.T) {
24842488
return fmt.Errorf("Expect Original Offset to be non-zero if resubmission")
24852489
}
24862490

2491+
expectedKafkaMsgCh <- expectedKafkaMsg
24872492
return nil
24882493
})
24892494

@@ -2537,8 +2542,14 @@ func TestResubmission(t *testing.T) {
25372542
}
25382543

25392544
// Emits the kafka message produced by consenter
2540-
mpc.YieldMessage(newMockConsumerMessage(expectedKafkaMsg))
2541-
mockSupport.BlockCutterVal.Block <- struct{}{}
2545+
select {
2546+
case expectedKafkaMsg := <-expectedKafkaMsgCh:
2547+
require.NotNil(t, expectedKafkaMsg)
2548+
mpc.YieldMessage(newMockConsumerMessage(expectedKafkaMsg))
2549+
mockSupport.BlockCutterVal.Block <- struct{}{}
2550+
case <-time.After(shortTimeout):
2551+
t.Fatalf("Expected to receive kafka message")
2552+
}
25422553

25432554
select {
25442555
case <-mockSupport.Blocks:
@@ -2922,9 +2933,12 @@ func TestResubmission(t *testing.T) {
29222933
}
29232934
defer close(mockSupport.BlockCutterVal.Block)
29242935

2925-
expectedKafkaMsg := &ab.KafkaMessage{}
2936+
expectedKafkaMsgCh := make(chan *ab.KafkaMessage, 1)
29262937
producer := mocks.NewSyncProducer(t, mockBrokerConfig)
29272938
producer.ExpectSendMessageWithCheckerFunctionAndSucceed(func(val []byte) error {
2939+
defer close(expectedKafkaMsgCh)
2940+
2941+
expectedKafkaMsg := &ab.KafkaMessage{}
29282942
if err := proto.Unmarshal(val, expectedKafkaMsg); err != nil {
29292943
return err
29302944
}
@@ -2942,6 +2956,7 @@ func TestResubmission(t *testing.T) {
29422956
return fmt.Errorf("Expect Original Offset to be non-zero if resubmission")
29432957
}
29442958

2959+
expectedKafkaMsgCh <- expectedKafkaMsg
29452960
return nil
29462961
})
29472962

@@ -2986,8 +3001,14 @@ func TestResubmission(t *testing.T) {
29863001
// check that WaitReady is actually blocked because of in-flight reprocessed messages
29873002
blockIngressMsg(t, true, bareMinimumChain.WaitReady)
29883003

2989-
// Emits the kafka message produced by consenter
2990-
mpc.YieldMessage(newMockConsumerMessage(expectedKafkaMsg))
3004+
select {
3005+
case expectedKafkaMsg := <-expectedKafkaMsgCh:
3006+
require.NotNil(t, expectedKafkaMsg)
3007+
// Emits the kafka message produced by consenter
3008+
mpc.YieldMessage(newMockConsumerMessage(expectedKafkaMsg))
3009+
case <-time.After(shortTimeout):
3010+
t.Fatalf("Expected to receive kafka message")
3011+
}
29913012

29923013
select {
29933014
case <-mockSupport.Blocks:

0 commit comments

Comments
 (0)