Skip to content

Commit cccca94

Browse files
committed
[FAB-6075] Add timestamp and timewindow to event reg
This CR adds a timestamp and timewindow to be used for event registration. The peer will check the timestamp of the event (as set by the client) to ensure it is within the duration specified by the timewindow of the peer's time. Change-Id: I3b506bdd76a21e65f52debc0e08b8088ea837885 Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent f754f40 commit cccca94

File tree

11 files changed

+153
-85
lines changed

11 files changed

+153
-85
lines changed

events/consumer/consumer.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
"golang.org/x/net/context"
2727
"google.golang.org/grpc"
2828

29+
"github.com/golang/protobuf/ptypes/timestamp"
2930
"github.com/hyperledger/fabric/common/flogging"
31+
"github.com/hyperledger/fabric/common/util"
3032
"github.com/hyperledger/fabric/core/comm"
3133
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
3234
ehpb "github.com/hyperledger/fabric/protos/peer"
@@ -44,6 +46,13 @@ type EventsClient struct {
4446
adapter EventAdapter
4547
}
4648

49+
// RegistrationConfig holds the information to be used when registering for
50+
// events from the eventhub
51+
type RegistrationConfig struct {
52+
InterestedEvents []*ehpb.Interest
53+
Timestamp *timestamp.Timestamp
54+
}
55+
4756
//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
4857
func NewEventsClient(peerAddress string, regTimeout time.Duration, adapter EventAdapter) (*EventsClient, error) {
4958
var err error
@@ -96,12 +105,12 @@ func (ec *EventsClient) send(emsg *ehpb.Event) error {
96105
}
97106

98107
// RegisterAsync - registers interest in a event and doesn't wait for a response
99-
func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
108+
func (ec *EventsClient) RegisterAsync(config *RegistrationConfig) error {
100109
creator, err := getCreatorFromLocalMSP()
101110
if err != nil {
102111
return fmt.Errorf("error getting creator from MSP: %s", err)
103112
}
104-
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}, Creator: creator}
113+
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: config.InterestedEvents}}, Creator: creator, Timestamp: config.Timestamp}
105114

106115
if err = ec.send(emsg); err != nil {
107116
consumerLogger.Errorf("error on Register send %s\n", err)
@@ -110,9 +119,9 @@ func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
110119
}
111120

112121
// register - registers interest in a event
113-
func (ec *EventsClient) register(ies []*ehpb.Interest) error {
122+
func (ec *EventsClient) register(config *RegistrationConfig) error {
114123
var err error
115-
if err = ec.RegisterAsync(ies); err != nil {
124+
if err = ec.RegisterAsync(config); err != nil {
116125
return err
117126
}
118127

@@ -221,7 +230,8 @@ func (ec *EventsClient) Start() error {
221230
return fmt.Errorf("could not create client conn to %s:%s", ec.peerAddress, err)
222231
}
223232

224-
if err = ec.register(ies); err != nil {
233+
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp()}
234+
if err = ec.register(regConfig); err != nil {
225235
return err
226236
}
227237

events/consumer/consumer_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17+
"github.com/hyperledger/fabric/common/util"
1718
coreutil "github.com/hyperledger/fabric/core/testutil"
1819
"github.com/hyperledger/fabric/events/producer"
1920
"github.com/hyperledger/fabric/msp/mgmt/testtools"
@@ -159,7 +160,8 @@ func TestUnregisterAsync(t *testing.T) {
159160
t.Fail()
160161
}
161162

162-
obcEHClient.RegisterAsync(ies)
163+
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp()}
164+
obcEHClient.RegisterAsync(regConfig)
163165
err = obcEHClient.UnregisterAsync(ies)
164166
assert.NoError(t, err)
165167

@@ -254,9 +256,8 @@ func TestMain(m *testing.M) {
254256
return
255257
}
256258

257-
ehServer := producer.NewEventsServer(
258-
uint(viper.GetInt("peer.events.buffersize")),
259-
viper.GetDuration("peer.events.timeout"))
259+
ehConfig := &producer.EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
260+
ehServer := producer.NewEventsServer(ehConfig)
260261
ehpb.RegisterEventsServer(grpcServer, ehServer)
261262

262263
go grpcServer.Serve(lis)

events/producer/events.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ type eventProcessor struct {
210210
//if 0, if buffer full, will block and guarantee the event will be sent out
211211
//if > 0, if buffer full, blocks till timeout
212212
timeout time.Duration
213+
214+
//time difference from peer time where registration events can be considered
215+
//valid
216+
timeWindow time.Duration
213217
}
214218

215219
//global eventProcessor singleton created by initializeEvents. Openchain producers
@@ -243,12 +247,12 @@ func (ep *eventProcessor) start() {
243247
}
244248

245249
//initialize and start
246-
func initializeEvents(bufferSize uint, tout time.Duration) {
250+
func initializeEvents(config *EventsServerConfig) {
247251
if gEventProcessor != nil {
248252
panic("should not be called twice")
249253
}
250254

251-
gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, bufferSize), timeout: tout}
255+
gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, config.BufferSize), timeout: config.Timeout, timeWindow: config.TimeWindow}
252256

253257
addInternalEventTypes()
254258

events/producer/events_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestDeRegister(t *testing.T) {
121121
assert.Error(t, deRegisterHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, nil))
122122
}
123123

124-
func TestRegister(t *testing.T) {
124+
func TestRegisterHandler(t *testing.T) {
125125
f := func() {
126126
registerHandler(nil, nil)
127127
}
@@ -157,10 +157,9 @@ func TestProcessEvents(t *testing.T) {
157157
}
158158

159159
func TestInitializeEvents_twice(t *testing.T) {
160+
config := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
160161
initializeEventsTwice := func() {
161-
initializeEvents(
162-
uint(viper.GetInt("peer.events.buffersize")),
163-
viper.GetDuration("peer.events.timeout"))
162+
initializeEvents(config)
164163
}
165164
assert.Panics(t, initializeEventsTwice)
166165
}

events/producer/handler.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package producer
1818

1919
import (
2020
"fmt"
21+
"math"
2122
"strconv"
23+
"time"
2224

2325
"github.com/golang/protobuf/proto"
2426

@@ -103,7 +105,7 @@ func (d *handler) deregisterAll() {
103105
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
104106
evt, err := validateEventMessage(msg)
105107
if err != nil {
106-
return fmt.Errorf("event message must be properly signed by an identity from the same organization as the peer: [%s]", err)
108+
return fmt.Errorf("event message validation failed: [%s]", err)
107109
}
108110

109111
switch evt.Event.(type) {
@@ -160,6 +162,16 @@ func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
160162
return nil, fmt.Errorf("error unmarshaling the event bytes in the SignedEvent: %s", err)
161163
}
162164

165+
if evt.GetTimestamp() != nil {
166+
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC().UnixNano()
167+
peerTime := time.Now().UnixNano()
168+
169+
if math.Abs(float64(peerTime-evtTime)) > float64(gEventProcessor.timeWindow.Nanoseconds()) {
170+
logger.Warningf("event timestamp %s is more than the %s `peer.events.timewindow` difference above/below peer time %s. either the peer and client clocks are out of sync or a replay attack has been attempted", evtTime, gEventProcessor.timeWindow, peerTime)
171+
return nil, fmt.Errorf("event timestamp out of acceptable range. must be within %s above/below peer time", gEventProcessor.timeWindow)
172+
}
173+
}
174+
163175
localMSP := mgmt.GetLocalMSP()
164176
principalGetter := mgmt.NewLocalMSPPrincipalGetter()
165177

events/producer/producer.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,23 @@ var logger = flogging.MustGetLogger("eventhub_producer")
3131
type EventsServer struct {
3232
}
3333

34+
// EventsServerConfig contains the setup config for the events server
35+
type EventsServerConfig struct {
36+
BufferSize uint
37+
Timeout time.Duration
38+
TimeWindow time.Duration
39+
}
40+
3441
//singleton - if we want to create multiple servers, we need to subsume events.gEventConsumers into EventsServer
3542
var globalEventsServer *EventsServer
3643

3744
// NewEventsServer returns a EventsServer
38-
func NewEventsServer(bufferSize uint, timeout time.Duration) *EventsServer {
45+
func NewEventsServer(config *EventsServerConfig) *EventsServer {
3946
if globalEventsServer != nil {
4047
panic("Cannot create multiple event hub servers")
4148
}
4249
globalEventsServer = new(EventsServer)
43-
initializeEvents(bufferSize, timeout)
50+
initializeEvents(config)
4451
//initializeCCEventProcessor(bufferSize, timeout)
4552
return globalEventsServer
4653
}

events/producer/producer_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"google.golang.org/grpc/grpclog"
3535
"google.golang.org/grpc/metadata"
3636

37+
"github.com/golang/protobuf/ptypes/timestamp"
3738
"github.com/hyperledger/fabric/common/ledger/testutil"
3839
mmsp "github.com/hyperledger/fabric/common/mocks/msp"
3940
"github.com/hyperledger/fabric/common/util"
@@ -250,7 +251,8 @@ func TestReceiveCCWildcard(t *testing.T) {
250251
var err error
251252

252253
adapter.count = 1
253-
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: ""}}}})
254+
config := &consumer.RegistrationConfig{InterestedEvents: []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: ""}}}}, Timestamp: util.CreateUtcTimestamp()}
255+
obcEHClient.RegisterAsync(config)
254256

255257
select {
256258
case <-adapter.notfy:
@@ -303,17 +305,18 @@ func TestFailReceive(t *testing.T) {
303305

304306
func TestUnregister(t *testing.T) {
305307
var err error
306-
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}})
308+
config := &consumer.RegistrationConfig{InterestedEvents: []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event11"}}}}, Timestamp: util.CreateUtcTimestamp()}
309+
obcEHClient.RegisterAsync(config)
307310

308311
adapter.count = 1
309312
select {
310313
case <-adapter.notfy:
311314
case <-time.After(2 * time.Second):
312-
t.Fail()
315+
t.FailNow()
313316
t.Logf("timed out on message")
314317
}
315318

316-
emsg := createTestChaincodeEvent("0xffffffff", "event10")
319+
emsg := createTestChaincodeEvent("0xffffffff", "event11")
317320
if err = Send(emsg); err != nil {
318321
t.Fail()
319322
t.Logf("Error sending message %s", err)
@@ -326,7 +329,7 @@ func TestUnregister(t *testing.T) {
326329
t.Fail()
327330
t.Logf("timed out on message")
328331
}
329-
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}})
332+
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event11"}}}})
330333
adapter.count = 1
331334
select {
332335
case <-adapter.notfy:
@@ -336,7 +339,7 @@ func TestUnregister(t *testing.T) {
336339
}
337340

338341
adapter.count = 1
339-
emsg = createTestChaincodeEvent("0xffffffff", "event10")
342+
emsg = createTestChaincodeEvent("0xffffffff", "event11")
340343
if err = Send(emsg); err != nil {
341344
t.Fail()
342345
t.Logf("Error sending message %s", err)
@@ -345,17 +348,30 @@ func TestUnregister(t *testing.T) {
345348
select {
346349
case <-adapter.notfy:
347350
t.Fail()
348-
t.Logf("should NOT have received event1")
351+
t.Logf("should NOT have received event11")
349352
case <-time.After(5 * time.Second):
350353
}
351354

352355
}
353356

357+
func TestRegister_outOfTimeWindow(t *testing.T) {
358+
interestedEvent := []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}}
359+
config := &consumer.RegistrationConfig{InterestedEvents: interestedEvent, Timestamp: &timestamp.Timestamp{Seconds: 0}}
360+
obcEHClient.RegisterAsync(config)
361+
362+
adapter.count = 0
363+
select {
364+
case <-adapter.notfy:
365+
t.Fail()
366+
t.Logf("register with out of range timestamp should fail")
367+
case <-time.After(2 * time.Second):
368+
}
369+
}
370+
354371
func TestNewEventsServer(t *testing.T) {
372+
config := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
355373
doubleCreation := func() {
356-
NewEventsServer(
357-
uint(viper.GetInt("peer.events.buffersize")),
358-
viper.GetDuration("peer.events.timeout"))
374+
NewEventsServer(config)
359375
}
360376
assert.Panics(t, doubleCreation)
361377

@@ -474,10 +490,11 @@ func TestMain(m *testing.M) {
474490
// use a buffer of 100 and blocking timeout
475491
viper.Set("peer.events.buffersize", 100)
476492
viper.Set("peer.events.timeout", 0)
493+
timeWindow, _ := time.ParseDuration("1m")
494+
viper.Set("peer.events.timewindow", timeWindow)
477495

478-
ehServer = NewEventsServer(
479-
uint(viper.GetInt("peer.events.buffersize")),
480-
viper.GetDuration("peer.events.timeout"))
496+
ehConfig := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
497+
ehServer = NewEventsServer(ehConfig)
481498
ehpb.RegisterEventsServer(grpcServer, ehServer)
482499

483500
go grpcServer.Serve(lis)

peer/node/start.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -385,11 +385,11 @@ func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer
385385
logger.Errorf("Failed to return new GRPC server: %s", err)
386386
return nil, err
387387
}
388-
ehServer := producer.NewEventsServer(
389-
uint(viper.GetInt("peer.events.buffersize")),
390-
viper.GetDuration("peer.events.timeout"))
391388

389+
ehConfig := &producer.EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
390+
ehServer := producer.NewEventsServer(ehConfig)
392391
pb.RegisterEventsServer(grpcServer.Server(), ehServer)
392+
393393
return grpcServer, nil
394394
}
395395

0 commit comments

Comments
 (0)