@@ -34,6 +34,7 @@ import (
3434 "google.golang.org/grpc/grpclog"
3535 "google.golang.org/grpc/metadata"
3636
37+ "github.com/golang/protobuf/ptypes/timestamp"
3738 "github.com/hyperledger/fabric/common/ledger/testutil"
3839 mmsp "github.com/hyperledger/fabric/common/mocks/msp"
3940 "github.com/hyperledger/fabric/common/util"
@@ -250,7 +251,8 @@ func TestReceiveCCWildcard(t *testing.T) {
250251 var err error
251252
252253 adapter .count = 1
253- obcEHClient .RegisterAsync ([]* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "" }}}})
254+ config := & consumer.RegistrationConfig {InterestedEvents : []* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "" }}}}, Timestamp : util .CreateUtcTimestamp ()}
255+ obcEHClient .RegisterAsync (config )
254256
255257 select {
256258 case <- adapter .notfy :
@@ -303,17 +305,18 @@ func TestFailReceive(t *testing.T) {
303305
304306func TestUnregister (t * testing.T ) {
305307 var err error
306- obcEHClient .RegisterAsync ([]* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "event10" }}}})
308+ config := & consumer.RegistrationConfig {InterestedEvents : []* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "event11" }}}}, Timestamp : util .CreateUtcTimestamp ()}
309+ obcEHClient .RegisterAsync (config )
307310
308311 adapter .count = 1
309312 select {
310313 case <- adapter .notfy :
311314 case <- time .After (2 * time .Second ):
312- t .Fail ()
315+ t .FailNow ()
313316 t .Logf ("timed out on message" )
314317 }
315318
316- emsg := createTestChaincodeEvent ("0xffffffff" , "event10 " )
319+ emsg := createTestChaincodeEvent ("0xffffffff" , "event11 " )
317320 if err = Send (emsg ); err != nil {
318321 t .Fail ()
319322 t .Logf ("Error sending message %s" , err )
@@ -326,7 +329,7 @@ func TestUnregister(t *testing.T) {
326329 t .Fail ()
327330 t .Logf ("timed out on message" )
328331 }
329- obcEHClient .UnregisterAsync ([]* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "event10 " }}}})
332+ obcEHClient .UnregisterAsync ([]* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "event11 " }}}})
330333 adapter .count = 1
331334 select {
332335 case <- adapter .notfy :
@@ -336,7 +339,7 @@ func TestUnregister(t *testing.T) {
336339 }
337340
338341 adapter .count = 1
339- emsg = createTestChaincodeEvent ("0xffffffff" , "event10 " )
342+ emsg = createTestChaincodeEvent ("0xffffffff" , "event11 " )
340343 if err = Send (emsg ); err != nil {
341344 t .Fail ()
342345 t .Logf ("Error sending message %s" , err )
@@ -345,17 +348,30 @@ func TestUnregister(t *testing.T) {
345348 select {
346349 case <- adapter .notfy :
347350 t .Fail ()
348- t .Logf ("should NOT have received event1 " )
351+ t .Logf ("should NOT have received event11 " )
349352 case <- time .After (5 * time .Second ):
350353 }
351354
352355}
353356
357+ func TestRegister_outOfTimeWindow (t * testing.T ) {
358+ interestedEvent := []* ehpb.Interest {& ehpb.Interest {EventType : ehpb .EventType_CHAINCODE , RegInfo : & ehpb.Interest_ChaincodeRegInfo {ChaincodeRegInfo : & ehpb.ChaincodeReg {ChaincodeId : "0xffffffff" , EventName : "event10" }}}}
359+ config := & consumer.RegistrationConfig {InterestedEvents : interestedEvent , Timestamp : & timestamp.Timestamp {Seconds : 0 }}
360+ obcEHClient .RegisterAsync (config )
361+
362+ adapter .count = 0
363+ select {
364+ case <- adapter .notfy :
365+ t .Fail ()
366+ t .Logf ("register with out of range timestamp should fail" )
367+ case <- time .After (2 * time .Second ):
368+ }
369+ }
370+
354371func TestNewEventsServer (t * testing.T ) {
372+ config := & EventsServerConfig {BufferSize : uint (viper .GetInt ("peer.events.buffersize" )), Timeout : viper .GetDuration ("peer.events.timeout" ), TimeWindow : viper .GetDuration ("peer.events.timewindow" )}
355373 doubleCreation := func () {
356- NewEventsServer (
357- uint (viper .GetInt ("peer.events.buffersize" )),
358- viper .GetDuration ("peer.events.timeout" ))
374+ NewEventsServer (config )
359375 }
360376 assert .Panics (t , doubleCreation )
361377
@@ -474,10 +490,11 @@ func TestMain(m *testing.M) {
474490 // use a buffer of 100 and blocking timeout
475491 viper .Set ("peer.events.buffersize" , 100 )
476492 viper .Set ("peer.events.timeout" , 0 )
493+ timeWindow , _ := time .ParseDuration ("1m" )
494+ viper .Set ("peer.events.timewindow" , timeWindow )
477495
478- ehServer = NewEventsServer (
479- uint (viper .GetInt ("peer.events.buffersize" )),
480- viper .GetDuration ("peer.events.timeout" ))
496+ ehConfig := & EventsServerConfig {BufferSize : uint (viper .GetInt ("peer.events.buffersize" )), Timeout : viper .GetDuration ("peer.events.timeout" ), TimeWindow : viper .GetDuration ("peer.events.timewindow" )}
497+ ehServer = NewEventsServer (ehConfig )
481498 ehpb .RegisterEventsServer (grpcServer , ehServer )
482499
483500 go grpcServer .Serve (lis )
0 commit comments