Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Added support for fromBlock when generating eventservice cache keys #184

Merged
merged 8 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/client/channel/chclientrun_std.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !pprof
// +build !pprof

/*
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type Client struct {
permitBlockEvents bool
fromBlock uint64
seekType seek.Type
chaincodeID string
eventConsumerTimeout *time.Duration
}

// New returns a Client instance. Client receives events such as block, filtered block,
// chaincode, and transaction status events.
// nolint: gocyclo
func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) {

channelContext, err := channelProvider()
Expand Down Expand Up @@ -68,6 +70,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock))
}
}
if eventClient.chaincodeID != "" {
opts = append(opts, deliverclient.WithChaincodeID(eventClient.chaincodeID))
}
if eventClient.eventConsumerTimeout != nil {
opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout))
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/client/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
pb "github.com/hyperledger/fabric-protos-go/peer"
)

var (
Expand All @@ -43,7 +43,7 @@ func TestNewEventClient(t *testing.T) {
t.Fatalf("Failed to create new event client: %s", err)
}

_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500 * time.Millisecond))
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500*time.Millisecond), WithChaincodeID("testChaincode"))
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}
Expand All @@ -55,6 +55,22 @@ func TestNewEventClient(t *testing.T) {
}
}

func TestNewEventClientWithFromBlock(t *testing.T) {

fabCtx := setupCustomTestContext(t, nil)
ctx := createChannelContext(fabCtx, channelID)

_, err := New(ctx)
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}

_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.FromBlock), WithBlockNum(100), WithChaincodeID("testChaincode"))
if err != nil {
t.Fatalf("Failed to create new event client: %s", err)
}
}

func TestBlockEvents(t *testing.T) {

eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withBlockLedger(sourceURL))
Expand Down
19 changes: 19 additions & 0 deletions pkg/client/event/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ func ExampleClient_RegisterChaincodeEvent() {

}

func ExampleClient_RegisterChaincodeEvent_NewService() {

ec, err := New(mockChannelProvider("mychannel"), WithChaincodeID("examplecc"))
if err != nil {
fmt.Println("failed to create client")
}

registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123")
if err != nil {
fmt.Println("failed to register chaincode event")
}
defer ec.Unregister(registration)

fmt.Println("chaincode event registered successfully")

// Output: chaincode event registered successfully

}

func ExampleClient_RegisterChaincodeEvent_withPayload() {

// If you require payload for chaincode events you have to use WithBlockEvents() option
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/event/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func WithSeekType(seek seek.Type) ClientOption {
}
}

// WithChaincodeID indicates the target chaincode
// Only deliverclient supports this
func WithChaincodeID(id string) ClientOption {
return func(c *Client) error {
c.chaincodeID = id
return nil
}
}

// WithEventConsumerTimeout is the timeout when sending events to a registered consumer.
// If < 0, if buffer full, unblocks immediately and does not send.
// If 0, if buffer full, will block and guarantee the event will be sent out.
Expand Down
1 change: 1 addition & 0 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestClientConnect(t *testing.T) {
),
WithSeekType(seek.FromBlock),
WithBlockNum(0),
WithChaincodeID("testChaincode"),
client.WithResponseTimeout(3*time.Second),
)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type params struct {
connProvider api.ConnectionProvider
seekType seek.Type
fromBlock uint64
chaincodeID string
respTimeout time.Duration
}

Expand Down Expand Up @@ -48,6 +49,15 @@ func WithBlockNum(value uint64) options.Opt {
}
}

// WithChaincodeID specifies the chaincode from which events are to be received.
func WithChaincodeID(value string) options.Opt {
return func(p options.Params) {
if setter, ok := p.(chaincodeIDSetter); ok {
setter.SetChaincodeID(value)
}
}
}

type seekTypeSetter interface {
SetSeekType(value seek.Type)
}
Expand All @@ -56,6 +66,10 @@ type fromBlockSetter interface {
SetFromBlock(value uint64)
}

type chaincodeIDSetter interface {
SetChaincodeID(value string)
}

func (p *params) PermitBlockEvents() {
logger.Debug("PermitBlockEvents")
p.connProvider = deliverProvider
Expand All @@ -79,6 +93,11 @@ func (p *params) SetSeekType(value seek.Type) {
}
}

func (p *params) SetChaincodeID(value string) {
logger.Debugf("ChaincodId: %d", value)
p.chaincodeID = value
}

func (p *params) SetResponseTimeout(value time.Duration) {
logger.Debugf("ResponseTimeout: %s", value)
p.respTimeout = value
Expand Down
1 change: 1 addition & 0 deletions pkg/fabsdk/fabsdk_std.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !pprof
// +build !pprof

/*
Expand Down
24 changes: 22 additions & 2 deletions pkg/fabsdk/provider/chpvdr/cachekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ package chpvdr

import (
"crypto/sha256"
"strconv"
"fmt"

"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
)

// ctxtCacheKey is a lazy cache key for the context cache
Expand Down Expand Up @@ -99,6 +100,9 @@ func (k *eventCacheKey) String() string {

type params struct {
permitBlockEvents bool
seekType seek.Type
fromBlock uint64
chaincodeID string
}

func defaultParams() *params {
Expand All @@ -109,8 +113,24 @@ func (p *params) PermitBlockEvents() {
p.permitBlockEvents = true
}

func (p *params) SetFromBlock(value uint64) {
p.fromBlock = value
}

func (p *params) SetSeekType(value seek.Type) {
if value != "" {
p.seekType = value
}
}

func (p *params) SetChaincodeID(value string) {
if value != "" {
p.chaincodeID = value
}
}

func (p *params) getOptKey() string {
// Construct opts portion
optKey := "blockEvents:" + strconv.FormatBool(p.permitBlockEvents)
optKey := fmt.Sprintf("blockEvents:%t,seekType:%s,fromBlock:%d,chaincodeId:%s", p.permitBlockEvents, p.seekType, p.fromBlock, p.chaincodeID)
return optKey
}
5 changes: 4 additions & 1 deletion pkg/fabsdk/provider/chpvdr/chprovider_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build testing
// +build testing

/*
Expand All @@ -24,6 +25,8 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/pkg/errors"
Expand Down Expand Up @@ -83,7 +86,7 @@ func TestBasicValidChannel(t *testing.T) {
assert.NotNil(t, channelConfig)
assert.NotEmptyf(t, channelConfig.ID(), "Got empty channel ID from channel config")

eventService, err := channelService.EventService()
eventService, err := channelService.EventService(client.WithBlockEvents(), deliverclient.WithSeekType("from"), deliverclient.WithBlockNum(10), deliverclient.WithChaincodeID("testChaincode"))
require.NoError(t, err)
require.NotNil(t, eventService)

Expand Down