Skip to content

Commit

Permalink
Decentralize relayer fixes (#1296)
Browse files Browse the repository at this point in the history
* add some logs

* more logs

* more logs

* more logs

* adds extra check before sending message

* comment

* improvement

* improvements

* logs

* more logs

* fix

* fixes

* adds nonce to log

* comments

* adds tags for release notes

* specify relayer path

* Update relayer/relays/execution/main.go

Co-authored-by: Vincent Geddes <117534+vgeddes@users.noreply.github.com>

---------

Co-authored-by: Vincent Geddes <117534+vgeddes@users.noreply.github.com>
  • Loading branch information
claravanstaden and vgeddes authored Oct 4, 2024
1 parent 7d558a1 commit 2c520a0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 12 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/release-relayer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: release-relayer

on:
push:
paths:
- "relayer/**"
branches:
- main
workflow_dispatch:
Expand Down Expand Up @@ -69,6 +71,7 @@ jobs:
id: new_version
run: |
# Get the most recent tag in the format relayer-<branchname>-<version>
current_tag=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1
current_version=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1 | sed -E 's/relayer-${{ steps.branch_name.outputs.branch }}-//')
echo "Current version: $current_version"
Expand All @@ -81,6 +84,7 @@ jobs:
echo "New version: $new_version"
echo "version=$new_version" >> $GITHUB_OUTPUT
echo "from_tag=$current_tag" >> $GITHUB_OUTPUT
- name: Create new tag
id: create_tag
Expand All @@ -97,6 +101,9 @@ jobs:
- name: "Build Changelog"
id: build_changelog
uses: mikepenz/release-changelog-builder-action@v4
with:
fromTag: ${{ steps.new_version.outputs.from_tag }}
toTag: ${{ steps.create_tag.outputs.tag }}

- name: Create a GitHub Release
id: create_release
Expand Down
2 changes: 1 addition & 1 deletion relayer/relays/beacon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (p ParachainConfig) Validate() error {
return errors.New("[maxWatchedExtrinsics] is not set")
}
if p.HeaderRedundancy == 0 {
return errors.New("[HeaderRedundancy] is not set")
return errors.New("[headerRedundancy] is not set")
}
return nil
}
29 changes: 29 additions & 0 deletions relayer/relays/beacon/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,35 @@ func (h *Header) FetchExecutionProof(blockRoot common.Hash, instantVerification

}

func (h *Header) CheckHeaderFinalized(blockRoot common.Hash, instantVerification bool) error {
header, err := h.syncer.Client.GetHeaderByBlockRoot(blockRoot)
if err != nil {
return fmt.Errorf("get beacon header by blockRoot: %w", err)
}
lastFinalizedHeaderState, err := h.writer.GetLastFinalizedHeaderState()
if err != nil {
return fmt.Errorf("fetch last finalized header state: %w", err)
}

// The latest finalized header on-chain is older than the header containing the message, so we need to sync the
// finalized header with the message.
finalizedHeader, err := h.syncer.GetFinalizedHeader()
if err != nil {
return err
}

// If the header is not finalized yet, we can't do anything further.
if header.Slot > uint64(finalizedHeader.Slot) {
return fmt.Errorf("chain not finalized yet: %w", ErrBeaconHeaderNotFinalized)
}

if header.Slot > lastFinalizedHeaderState.BeaconSlot && !instantVerification {
return fmt.Errorf("on-chain header not recent enough and instantVerification is off: %w", ErrBeaconHeaderNotFinalized)
}

return nil
}

func (h *Header) isInitialSyncPeriod() bool {
initialPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.InitialCheckpointSlot)
lastFinalizedPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.Finalized.LastSyncedSlot)
Expand Down
74 changes: 63 additions & 11 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
)
r.beaconHeader = &beaconHeader

log.Info("Current relay's ID:", r.config.Schedule.ID)
log.WithFields(log.Fields{
"relayerId": r.config.Schedule.ID,
"relayerCount": r.config.Schedule.TotalRelayerCount,
"sleepInterval": r.config.Schedule.SleepInterval,
}).Info("decentralization config")

for {
select {
Expand Down Expand Up @@ -146,9 +150,12 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
}

for _, ev := range events {
err = r.waitAndSend(ctx, ev)
if err != nil {
return fmt.Errorf("submit message: %w", err)
err := r.waitAndSend(ctx, ev)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet")
continue
} else if err != nil {
return fmt.Errorf("submit event: %w", err)
}
}
}
Expand Down Expand Up @@ -351,23 +358,32 @@ func (r *Relay) makeInboundMessage(
}

func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) {
var paraNonce uint64
ethNonce := ev.Nonce
waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount
log.WithFields(logrus.Fields{
"waitingPeriod": waitingPeriod,
}).Info("relayer waiting period")

var cnt uint64
for {
paraNonce, err = r.fetchLatestParachainNonce()
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
return fmt.Errorf("is message procssed: %w", err)
}
if ethNonce <= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
// If the message is already processed we shouldn't submit it again
if isProcessed {
return nil
}
// Check if the beacon header is finalized
if r.isInFinalizedBlock(ctx, ev) != nil {
return err
}
if cnt == waitingPeriod {
break
}
log.Info(fmt.Sprintf("sleeping for %d seconds.", time.Duration(r.config.Schedule.SleepInterval)))

time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second)
cnt++
}
Expand Down Expand Up @@ -406,13 +422,22 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
return nil
return err
}
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("is message processed: %w", err)
}
// If the message is already processed we shouldn't submit it again
if isProcessed {
return nil
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
Expand All @@ -429,3 +454,30 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa

return nil
}

// isMessageProcessed checks if the provided event nonce has already been processed on-chain.
func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) {
paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
}
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
if eventNonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
return true, nil
}

return false, nil
}

// isInFinalizedBlock checks if the block containing the event is a finalized block.
func (r *Relay) isInFinalizedBlock(ctx context.Context, event *contracts.GatewayOutboundMessageAccepted) error {
nextBlockNumber := new(big.Int).SetUint64(event.Raw.BlockNumber + 1)

blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
}

0 comments on commit 2c520a0

Please sign in to comment.