Skip to content

Commit

Permalink
[KS-365] Batch identical trigger events
Browse files Browse the repository at this point in the history
If multiple workflows need to receive the same trigger event, send it only once.
  • Loading branch information
bolekk committed Sep 16, 2024
1 parent 0187f18 commit 3da90ab
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 87 deletions.
11 changes: 3 additions & 8 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) {
newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
publisher := remote.NewTriggerPublisher(
capabilityConfig.RemoteTriggerConfig,
capability.(capabilities.TriggerCapability),
Expand All @@ -416,7 +416,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) {
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
return target.NewServer(
capabilityConfig.RemoteTargetConfig,
myPeerID,
Expand All @@ -441,12 +441,7 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee
return nil
}

type receiverService interface {
services.Service
remotetypes.Receiver
}

func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Capability, don registrysyncer.DON, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error)) error {
func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Capability, don registrysyncer.DON, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error)) error {
capID := capability.ID
info, err := capabilities.NewRemoteCapabilityInfo(
capID,
Expand Down
181 changes: 133 additions & 48 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package remote

import (
"context"
"crypto/sha256"
"encoding/binary"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -22,19 +23,22 @@ import (
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config *capabilities.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
workflowDONs map[uint32]commoncap.DON
membersCache map[uint32]map[p2ptypes.PeerID]bool
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
mu sync.RWMutex // protects messageCache and registrations
stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
config *commoncap.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
workflowDONs map[uint32]commoncap.DON
membersCache map[uint32]map[p2ptypes.PeerID]bool
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
mu sync.RWMutex // protects messageCache and registrations
batchingQueue map[[32]byte]*batchedResponse
batchingEnabled bool
bqMu sync.Mutex // protects batchingQueue
stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
}

type registrationKey struct {
Expand All @@ -47,13 +51,21 @@ type pubRegState struct {
request commoncap.TriggerRegistrationRequest
}

var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}
type batchedResponse struct {
rawResponse []byte
callerDonID uint32
triggerEventID string
workflowIDs []string
}

var _ types.ReceiverService = &triggerPublisher{}

func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
const minAllowedBatchCollectionPeriod = 10 * time.Millisecond

func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
if config == nil {
lggr.Info("no config provided, using default values")
config = &capabilities.RemoteTriggerConfig{}
config = &commoncap.RemoteTriggerConfig{}
}
config.ApplyDefaults()
membersCache := make(map[uint32]map[p2ptypes.PeerID]bool)
Expand All @@ -65,23 +77,29 @@ func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying co
membersCache[id] = cache
}
return &triggerPublisher{
config: config,
underlying: underlying,
capInfo: capInfo,
capDonInfo: capDonInfo,
workflowDONs: workflowDONs,
membersCache: membersCache,
dispatcher: dispatcher,
messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
stopCh: make(services.StopChan),
lggr: lggr.Named("TriggerPublisher"),
config: config,
underlying: underlying,
capInfo: capInfo,
capDonInfo: capDonInfo,
workflowDONs: workflowDONs,
membersCache: membersCache,
dispatcher: dispatcher,
messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
batchingQueue: make(map[[32]byte]*batchedResponse),
batchingEnabled: config.MaxBatchSize > 1 && config.BatchCollectionPeriod >= minAllowedBatchCollectionPeriod,
stopCh: make(services.StopChan),
lggr: lggr.Named("TriggerPublisher"),
}
}

func (p *triggerPublisher) Start(ctx context.Context) error {
p.wg.Add(1)
go p.registrationCleanupLoop()
if p.batchingEnabled {
p.wg.Add(1)
go p.batchingLoop()
}
p.lggr.Info("TriggerPublisher started")
return nil
}
Expand Down Expand Up @@ -202,31 +220,98 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR
}
triggerEvent := response.Event
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID)
marshaled, err := pb.MarshalTriggerResponse(response)
marshaledResponse, err := pb.MarshalTriggerResponse(response)
if err != nil {
p.lggr.Debugw("can't marshal trigger event", "err", err)
break
}
msg := &types.MessageBody{
CapabilityId: p.capInfo.ID,
CapabilityDonId: p.capDonInfo.ID,
CallerDonId: key.callerDonId,
Method: types.MethodTriggerEvent,
Payload: marshaled,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
// NOTE: optionally introduce batching across workflows as an optimization
WorkflowIds: []string{key.workflowId},
TriggerEventId: triggerEvent.ID,
},

if p.batchingEnabled {
p.enqueueForBatching(marshaledResponse, key, triggerEvent.ID)
} else {
// a single-element "batch"
p.sendBatch(&batchedResponse{
rawResponse: marshaledResponse,
callerDonID: key.callerDonId,
triggerEventID: triggerEvent.ID,
workflowIDs: []string{key.workflowId},
})
}
}
}
}

func (p *triggerPublisher) enqueueForBatching(rawResponse []byte, key registrationKey, triggerEventID string) {
// put in batching queue, group by hash(callerDonId, triggerEventID, response)
combined := make([]byte, 4)
binary.LittleEndian.PutUint32(combined, key.callerDonId)
combined = append(combined, []byte(triggerEventID)...)
combined = append(combined, rawResponse...)
sha := sha256.Sum256(combined)
p.bqMu.Lock()
elem, exists := p.batchingQueue[sha]
if !exists {
elem = &batchedResponse{
rawResponse: rawResponse,
callerDonID: key.callerDonId,
triggerEventID: triggerEventID,
workflowIDs: []string{key.workflowId},
}
p.batchingQueue[sha] = elem
} else {
elem.workflowIDs = append(elem.workflowIDs, key.workflowId)
}
p.bqMu.Unlock()
}

func (p *triggerPublisher) sendBatch(resp *batchedResponse) {
for len(resp.workflowIDs) > 0 {
idBatch := resp.workflowIDs
if p.batchingEnabled && int64(len(idBatch)) > int64(p.config.MaxBatchSize) {
idBatch = idBatch[:p.config.MaxBatchSize]
resp.workflowIDs = resp.workflowIDs[p.config.MaxBatchSize:]
} else {
resp.workflowIDs = nil
}
msg := &types.MessageBody{
CapabilityId: p.capInfo.ID,
CapabilityDonId: p.capDonInfo.ID,
CallerDonId: resp.callerDonID,
Method: types.MethodTriggerEvent,
Payload: resp.rawResponse,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
WorkflowIds: idBatch,
TriggerEventId: resp.triggerEventID,
},
},
}
// NOTE: send to all nodes by default, introduce different strategies later (KS-76)
for _, peerID := range p.workflowDONs[resp.callerDonID].Members {
err := p.dispatcher.Send(peerID, msg)
if err != nil {
p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err)
}
// NOTE: send to all nodes by default, introduce different strategies later (KS-76)
for _, peerID := range p.workflowDONs[key.callerDonId].Members {
err = p.dispatcher.Send(peerID, msg)
if err != nil {
p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err)
}
}
}
}

func (p *triggerPublisher) batchingLoop() {
defer p.wg.Done()
ticker := time.NewTicker(p.config.BatchCollectionPeriod)
defer ticker.Stop()
for {
select {
case <-p.stopCh:
return
case <-ticker.C:
p.bqMu.Lock()
queue := p.batchingQueue
p.batchingQueue = make(map[[32]byte]*batchedResponse)
p.bqMu.Unlock()

for _, elem := range queue {
p.sendBatch(elem)
}
}
}
Expand Down
Loading

0 comments on commit 3da90ab

Please sign in to comment.