@@ -25,6 +25,7 @@ import (
25
25
"github.com/hyperledger/fabric/protos/utils"
26
26
"github.com/stretchr/testify/assert"
27
27
"github.com/stretchr/testify/mock"
28
+ "github.com/stretchr/testify/require"
28
29
)
29
30
30
31
var (
@@ -2464,9 +2465,12 @@ func TestResubmission(t *testing.T) {
2464
2465
}
2465
2466
defer close (mockSupport .BlockCutterVal .Block )
2466
2467
2467
- expectedKafkaMsg := & ab.KafkaMessage {}
2468
+ expectedKafkaMsgCh := make ( chan * ab.KafkaMessage , 1 )
2468
2469
producer := mocks .NewSyncProducer (t , mockBrokerConfig )
2469
2470
producer .ExpectSendMessageWithCheckerFunctionAndSucceed (func (val []byte ) error {
2471
+ defer close (expectedKafkaMsgCh )
2472
+
2473
+ expectedKafkaMsg := & ab.KafkaMessage {}
2470
2474
if err := proto .Unmarshal (val , expectedKafkaMsg ); err != nil {
2471
2475
return err
2472
2476
}
@@ -2484,6 +2488,7 @@ func TestResubmission(t *testing.T) {
2484
2488
return fmt .Errorf ("Expect Original Offset to be non-zero if resubmission" )
2485
2489
}
2486
2490
2491
+ expectedKafkaMsgCh <- expectedKafkaMsg
2487
2492
return nil
2488
2493
})
2489
2494
@@ -2537,8 +2542,14 @@ func TestResubmission(t *testing.T) {
2537
2542
}
2538
2543
2539
2544
// Emits the kafka message produced by consenter
2540
- mpc .YieldMessage (newMockConsumerMessage (expectedKafkaMsg ))
2541
- mockSupport .BlockCutterVal .Block <- struct {}{}
2545
+ select {
2546
+ case expectedKafkaMsg := <- expectedKafkaMsgCh :
2547
+ require .NotNil (t , expectedKafkaMsg )
2548
+ mpc .YieldMessage (newMockConsumerMessage (expectedKafkaMsg ))
2549
+ mockSupport .BlockCutterVal .Block <- struct {}{}
2550
+ case <- time .After (shortTimeout ):
2551
+ t .Fatalf ("Expected to receive kafka message" )
2552
+ }
2542
2553
2543
2554
select {
2544
2555
case <- mockSupport .Blocks :
@@ -2922,9 +2933,12 @@ func TestResubmission(t *testing.T) {
2922
2933
}
2923
2934
defer close (mockSupport .BlockCutterVal .Block )
2924
2935
2925
- expectedKafkaMsg := & ab.KafkaMessage {}
2936
+ expectedKafkaMsgCh := make ( chan * ab.KafkaMessage , 1 )
2926
2937
producer := mocks .NewSyncProducer (t , mockBrokerConfig )
2927
2938
producer .ExpectSendMessageWithCheckerFunctionAndSucceed (func (val []byte ) error {
2939
+ defer close (expectedKafkaMsgCh )
2940
+
2941
+ expectedKafkaMsg := & ab.KafkaMessage {}
2928
2942
if err := proto .Unmarshal (val , expectedKafkaMsg ); err != nil {
2929
2943
return err
2930
2944
}
@@ -2942,6 +2956,7 @@ func TestResubmission(t *testing.T) {
2942
2956
return fmt .Errorf ("Expect Original Offset to be non-zero if resubmission" )
2943
2957
}
2944
2958
2959
+ expectedKafkaMsgCh <- expectedKafkaMsg
2945
2960
return nil
2946
2961
})
2947
2962
@@ -2986,8 +3001,14 @@ func TestResubmission(t *testing.T) {
2986
3001
// check that WaitReady is actually blocked because of in-flight reprocessed messages
2987
3002
blockIngressMsg (t , true , bareMinimumChain .WaitReady )
2988
3003
2989
- // Emits the kafka message produced by consenter
2990
- mpc .YieldMessage (newMockConsumerMessage (expectedKafkaMsg ))
3004
+ select {
3005
+ case expectedKafkaMsg := <- expectedKafkaMsgCh :
3006
+ require .NotNil (t , expectedKafkaMsg )
3007
+ // Emits the kafka message produced by consenter
3008
+ mpc .YieldMessage (newMockConsumerMessage (expectedKafkaMsg ))
3009
+ case <- time .After (shortTimeout ):
3010
+ t .Fatalf ("Expected to receive kafka message" )
3011
+ }
2991
3012
2992
3013
select {
2993
3014
case <- mockSupport .Blocks :
0 commit comments