Skip to content

Commit

Permalink
only write to listener's app relayers
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Aug 20, 2024
1 parent 5e4e2e9 commit e3d750e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
7 changes: 6 additions & 1 deletion relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ func (lstnr *Listener) processLogs(ctx context.Context) error {
return fmt.Errorf("failed to catch up on historical blocks")
}
case blockHeader := <-lstnr.Subscriber.Headers():
go lstnr.messageCoordinator.ProcessBlock(blockHeader, lstnr.ethClient, errChan)
go lstnr.messageCoordinator.ProcessBlock(
blockHeader,
lstnr.sourceBlockchain.GetBlockchainID(),
lstnr.ethClient,
errChan,
)
case err := <-lstnr.Subscriber.Err():
lstnr.healthStatus.Store(false)
lstnr.logger.Error(
Expand Down
4 changes: 4 additions & 0 deletions relayer/message_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (mc *MessageCoordinator) ProcessMessageID(
// Meant to be ran asynchronously. Errors should be sent to errChan.
func (mc *MessageCoordinator) ProcessBlock(
blockHeader *types.Header,
blockchainID ids.ID,
ethClient ethclient.Client,
errChan chan error,
) {
Expand Down Expand Up @@ -256,6 +257,9 @@ func (mc *MessageCoordinator) ProcessBlock(
}
// Initiate message relay of all registered messages
for _, appRelayer := range mc.applicationRelayers {
if appRelayer.sourceBlockchain.GetBlockchainID() != blockchainID {
continue
}
// Dispatch all messages in the block to the appropriate application relayer.
// An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed.
handlers := messageHandlers[appRelayer.relayerID.ID]
Expand Down

0 comments on commit e3d750e

Please sign in to comment.