Skip to content

Commit

Permalink
[FAB-6075] Add timestamp and timewindow to event reg
Browse files Browse the repository at this point in the history
This CR adds a timestamp and timewindow to be used for event
registration. The peer will check the timestamp of the event (as set
by the client) to ensure it is within the duration specified
by the timewindow of the peer's time.

Change-Id: I3b506bdd76a21e65f52debc0e08b8088ea837885
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed Oct 19, 2017
1 parent f754f40 commit cccca94
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 85 deletions.
20 changes: 15 additions & 5 deletions events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
ehpb "github.com/hyperledger/fabric/protos/peer"
Expand All @@ -44,6 +46,13 @@ type EventsClient struct {
adapter EventAdapter
}

// RegistrationConfig holds the information to be used when registering for
// events from the eventhub
type RegistrationConfig struct {
InterestedEvents []*ehpb.Interest
Timestamp *timestamp.Timestamp
}

//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
func NewEventsClient(peerAddress string, regTimeout time.Duration, adapter EventAdapter) (*EventsClient, error) {
var err error
Expand Down Expand Up @@ -96,12 +105,12 @@ func (ec *EventsClient) send(emsg *ehpb.Event) error {
}

// RegisterAsync - registers interest in a event and doesn't wait for a response
func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
func (ec *EventsClient) RegisterAsync(config *RegistrationConfig) error {
creator, err := getCreatorFromLocalMSP()
if err != nil {
return fmt.Errorf("error getting creator from MSP: %s", err)
}
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: ies}}, Creator: creator}
emsg := &ehpb.Event{Event: &ehpb.Event_Register{Register: &ehpb.Register{Events: config.InterestedEvents}}, Creator: creator, Timestamp: config.Timestamp}

if err = ec.send(emsg); err != nil {
consumerLogger.Errorf("error on Register send %s\n", err)
Expand All @@ -110,9 +119,9 @@ func (ec *EventsClient) RegisterAsync(ies []*ehpb.Interest) error {
}

// register - registers interest in a event
func (ec *EventsClient) register(ies []*ehpb.Interest) error {
func (ec *EventsClient) register(config *RegistrationConfig) error {
var err error
if err = ec.RegisterAsync(ies); err != nil {
if err = ec.RegisterAsync(config); err != nil {
return err
}

Expand Down Expand Up @@ -221,7 +230,8 @@ func (ec *EventsClient) Start() error {
return fmt.Errorf("could not create client conn to %s:%s", ec.peerAddress, err)
}

if err = ec.register(ies); err != nil {
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp()}
if err = ec.register(regConfig); err != nil {
return err
}

Expand Down
9 changes: 5 additions & 4 deletions events/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/hyperledger/fabric/common/util"
coreutil "github.com/hyperledger/fabric/core/testutil"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
Expand Down Expand Up @@ -159,7 +160,8 @@ func TestUnregisterAsync(t *testing.T) {
t.Fail()
}

obcEHClient.RegisterAsync(ies)
regConfig := &RegistrationConfig{InterestedEvents: ies, Timestamp: util.CreateUtcTimestamp()}
obcEHClient.RegisterAsync(regConfig)
err = obcEHClient.UnregisterAsync(ies)
assert.NoError(t, err)

Expand Down Expand Up @@ -254,9 +256,8 @@ func TestMain(m *testing.M) {
return
}

ehServer := producer.NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))
ehConfig := &producer.EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
ehServer := producer.NewEventsServer(ehConfig)
ehpb.RegisterEventsServer(grpcServer, ehServer)

go grpcServer.Serve(lis)
Expand Down
8 changes: 6 additions & 2 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ type eventProcessor struct {
//if 0, if buffer full, will block and guarantee the event will be sent out
//if > 0, if buffer full, blocks till timeout
timeout time.Duration

//time difference from peer time where registration events can be considered
//valid
timeWindow time.Duration
}

//global eventProcessor singleton created by initializeEvents. Openchain producers
Expand Down Expand Up @@ -243,12 +247,12 @@ func (ep *eventProcessor) start() {
}

//initialize and start
func initializeEvents(bufferSize uint, tout time.Duration) {
func initializeEvents(config *EventsServerConfig) {
if gEventProcessor != nil {
panic("should not be called twice")
}

gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, bufferSize), timeout: tout}
gEventProcessor = &eventProcessor{eventConsumers: make(map[pb.EventType]handlerList), eventChannel: make(chan *pb.Event, config.BufferSize), timeout: config.Timeout, timeWindow: config.TimeWindow}

addInternalEventTypes()

Expand Down
7 changes: 3 additions & 4 deletions events/producer/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestDeRegister(t *testing.T) {
assert.Error(t, deRegisterHandler(&peer.Interest{EventType: peer.EventType_BLOCK}, nil))
}

func TestRegister(t *testing.T) {
func TestRegisterHandler(t *testing.T) {
f := func() {
registerHandler(nil, nil)
}
Expand Down Expand Up @@ -157,10 +157,9 @@ func TestProcessEvents(t *testing.T) {
}

func TestInitializeEvents_twice(t *testing.T) {
config := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
initializeEventsTwice := func() {
initializeEvents(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))
initializeEvents(config)
}
assert.Panics(t, initializeEventsTwice)
}
Expand Down
14 changes: 13 additions & 1 deletion events/producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package producer

import (
"fmt"
"math"
"strconv"
"time"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -103,7 +105,7 @@ func (d *handler) deregisterAll() {
func (d *handler) HandleMessage(msg *pb.SignedEvent) error {
evt, err := validateEventMessage(msg)
if err != nil {
return fmt.Errorf("event message must be properly signed by an identity from the same organization as the peer: [%s]", err)
return fmt.Errorf("event message validation failed: [%s]", err)
}

switch evt.Event.(type) {
Expand Down Expand Up @@ -160,6 +162,16 @@ func validateEventMessage(signedEvt *pb.SignedEvent) (*pb.Event, error) {
return nil, fmt.Errorf("error unmarshaling the event bytes in the SignedEvent: %s", err)
}

if evt.GetTimestamp() != nil {
evtTime := time.Unix(evt.GetTimestamp().Seconds, int64(evt.GetTimestamp().Nanos)).UTC().UnixNano()
peerTime := time.Now().UnixNano()

if math.Abs(float64(peerTime-evtTime)) > float64(gEventProcessor.timeWindow.Nanoseconds()) {
logger.Warningf("event timestamp %s is more than the %s `peer.events.timewindow` difference above/below peer time %s. either the peer and client clocks are out of sync or a replay attack has been attempted", evtTime, gEventProcessor.timeWindow, peerTime)
return nil, fmt.Errorf("event timestamp out of acceptable range. must be within %s above/below peer time", gEventProcessor.timeWindow)
}
}

localMSP := mgmt.GetLocalMSP()
principalGetter := mgmt.NewLocalMSPPrincipalGetter()

Expand Down
11 changes: 9 additions & 2 deletions events/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@ var logger = flogging.MustGetLogger("eventhub_producer")
type EventsServer struct {
}

// EventsServerConfig contains the setup config for the events server
type EventsServerConfig struct {
BufferSize uint
Timeout time.Duration
TimeWindow time.Duration
}

//singleton - if we want to create multiple servers, we need to subsume events.gEventConsumers into EventsServer
var globalEventsServer *EventsServer

// NewEventsServer returns a EventsServer
func NewEventsServer(bufferSize uint, timeout time.Duration) *EventsServer {
func NewEventsServer(config *EventsServerConfig) *EventsServer {
if globalEventsServer != nil {
panic("Cannot create multiple event hub servers")
}
globalEventsServer = new(EventsServer)
initializeEvents(bufferSize, timeout)
initializeEvents(config)
//initializeCCEventProcessor(bufferSize, timeout)
return globalEventsServer
}
Expand Down
43 changes: 30 additions & 13 deletions events/producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/common/ledger/testutil"
mmsp "github.com/hyperledger/fabric/common/mocks/msp"
"github.com/hyperledger/fabric/common/util"
Expand Down Expand Up @@ -250,7 +251,8 @@ func TestReceiveCCWildcard(t *testing.T) {
var err error

adapter.count = 1
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: ""}}}})
config := &consumer.RegistrationConfig{InterestedEvents: []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: ""}}}}, Timestamp: util.CreateUtcTimestamp()}
obcEHClient.RegisterAsync(config)

select {
case <-adapter.notfy:
Expand Down Expand Up @@ -303,17 +305,18 @@ func TestFailReceive(t *testing.T) {

func TestUnregister(t *testing.T) {
var err error
obcEHClient.RegisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}})
config := &consumer.RegistrationConfig{InterestedEvents: []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event11"}}}}, Timestamp: util.CreateUtcTimestamp()}
obcEHClient.RegisterAsync(config)

adapter.count = 1
select {
case <-adapter.notfy:
case <-time.After(2 * time.Second):
t.Fail()
t.FailNow()
t.Logf("timed out on message")
}

emsg := createTestChaincodeEvent("0xffffffff", "event10")
emsg := createTestChaincodeEvent("0xffffffff", "event11")
if err = Send(emsg); err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
Expand All @@ -326,7 +329,7 @@ func TestUnregister(t *testing.T) {
t.Fail()
t.Logf("timed out on message")
}
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}})
obcEHClient.UnregisterAsync([]*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event11"}}}})
adapter.count = 1
select {
case <-adapter.notfy:
Expand All @@ -336,7 +339,7 @@ func TestUnregister(t *testing.T) {
}

adapter.count = 1
emsg = createTestChaincodeEvent("0xffffffff", "event10")
emsg = createTestChaincodeEvent("0xffffffff", "event11")
if err = Send(emsg); err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
Expand All @@ -345,17 +348,30 @@ func TestUnregister(t *testing.T) {
select {
case <-adapter.notfy:
t.Fail()
t.Logf("should NOT have received event1")
t.Logf("should NOT have received event11")
case <-time.After(5 * time.Second):
}

}

func TestRegister_outOfTimeWindow(t *testing.T) {
interestedEvent := []*ehpb.Interest{&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event10"}}}}
config := &consumer.RegistrationConfig{InterestedEvents: interestedEvent, Timestamp: &timestamp.Timestamp{Seconds: 0}}
obcEHClient.RegisterAsync(config)

adapter.count = 0
select {
case <-adapter.notfy:
t.Fail()
t.Logf("register with out of range timestamp should fail")
case <-time.After(2 * time.Second):
}
}

func TestNewEventsServer(t *testing.T) {
config := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
doubleCreation := func() {
NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))
NewEventsServer(config)
}
assert.Panics(t, doubleCreation)

Expand Down Expand Up @@ -474,10 +490,11 @@ func TestMain(m *testing.M) {
// use a buffer of 100 and blocking timeout
viper.Set("peer.events.buffersize", 100)
viper.Set("peer.events.timeout", 0)
timeWindow, _ := time.ParseDuration("1m")
viper.Set("peer.events.timewindow", timeWindow)

ehServer = NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))
ehConfig := &EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
ehServer = NewEventsServer(ehConfig)
ehpb.RegisterEventsServer(grpcServer, ehServer)

go grpcServer.Serve(lis)
Expand Down
6 changes: 3 additions & 3 deletions peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer
logger.Errorf("Failed to return new GRPC server: %s", err)
return nil, err
}
ehServer := producer.NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))

ehConfig := &producer.EventsServerConfig{BufferSize: uint(viper.GetInt("peer.events.buffersize")), Timeout: viper.GetDuration("peer.events.timeout"), TimeWindow: viper.GetDuration("peer.events.timewindow")}
ehServer := producer.NewEventsServer(ehConfig)
pb.RegisterEventsServer(grpcServer.Server(), ehServer)

return grpcServer, nil
}

Expand Down
Loading

0 comments on commit cccca94

Please sign in to comment.