Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decentralize relayer fixes #1296

Merged
merged 19 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/release-relayer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: release-relayer

on:
push:
paths:
- "relayer/**"
branches:
- main
workflow_dispatch:
Expand Down Expand Up @@ -69,6 +71,7 @@ jobs:
id: new_version
run: |
# Get the most recent tag in the format relayer-<branchname>-<version>
current_tag=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1
current_version=$(git tag --list "relayer-${{ steps.branch_name.outputs.branch }}-*" --sort=-v:refname | head -n 1 | sed -E 's/relayer-${{ steps.branch_name.outputs.branch }}-//')
echo "Current version: $current_version"

Expand All @@ -81,6 +84,7 @@ jobs:

echo "New version: $new_version"
echo "version=$new_version" >> $GITHUB_OUTPUT
echo "from_tag=$current_tag" >> $GITHUB_OUTPUT

- name: Create new tag
id: create_tag
Expand All @@ -97,6 +101,9 @@ jobs:
- name: "Build Changelog"
id: build_changelog
uses: mikepenz/release-changelog-builder-action@v4
with:
fromTag: ${{ steps.new_version.outputs.from_tag }}
toTag: ${{ steps.create_tag.outputs.tag }}

- name: Create a GitHub Release
id: create_release
Expand Down
2 changes: 1 addition & 1 deletion relayer/relays/beacon/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (p ParachainConfig) Validate() error {
return errors.New("[maxWatchedExtrinsics] is not set")
}
if p.HeaderRedundancy == 0 {
return errors.New("[HeaderRedundancy] is not set")
return errors.New("[headerRedundancy] is not set")
}
return nil
}
29 changes: 29 additions & 0 deletions relayer/relays/beacon/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,35 @@ func (h *Header) FetchExecutionProof(blockRoot common.Hash, instantVerification

}

func (h *Header) CheckHeaderFinalized(blockRoot common.Hash, instantVerification bool) error {
header, err := h.syncer.Client.GetHeaderByBlockRoot(blockRoot)
if err != nil {
return fmt.Errorf("get beacon header by blockRoot: %w", err)
}
lastFinalizedHeaderState, err := h.writer.GetLastFinalizedHeaderState()
if err != nil {
return fmt.Errorf("fetch last finalized header state: %w", err)
}

// The latest finalized header on-chain is older than the header containing the message, so we need to sync the
// finalized header with the message.
finalizedHeader, err := h.syncer.GetFinalizedHeader()
if err != nil {
return err
}

// If the header is not finalized yet, we can't do anything further.
if header.Slot > uint64(finalizedHeader.Slot) {
return fmt.Errorf("chain not finalized yet: %w", ErrBeaconHeaderNotFinalized)
}

if header.Slot > lastFinalizedHeaderState.BeaconSlot && !instantVerification {
return fmt.Errorf("on-chain header not recent enough and instantVerification is off: %w", ErrBeaconHeaderNotFinalized)
}

return nil
}

func (h *Header) isInitialSyncPeriod() bool {
initialPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.InitialCheckpointSlot)
lastFinalizedPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.Finalized.LastSyncedSlot)
Expand Down
74 changes: 63 additions & 11 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
)
r.beaconHeader = &beaconHeader

log.Info("Current relay's ID:", r.config.Schedule.ID)
log.WithFields(log.Fields{
"relayerId": r.config.Schedule.ID,
"relayerCount": r.config.Schedule.TotalRelayerCount,
"sleepInterval": r.config.Schedule.SleepInterval,
}).Info("decentralization config")

for {
select {
Expand Down Expand Up @@ -146,9 +150,12 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
}

for _, ev := range events {
err = r.waitAndSend(ctx, ev)
if err != nil {
return fmt.Errorf("submit message: %w", err)
err := r.waitAndSend(ctx, ev)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet")
continue
} else if err != nil {
return fmt.Errorf("submit event: %w", err)
}
}
}
Expand Down Expand Up @@ -351,23 +358,32 @@ func (r *Relay) makeInboundMessage(
}

func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) {
var paraNonce uint64
ethNonce := ev.Nonce
waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount
log.WithFields(logrus.Fields{
"waitingPeriod": waitingPeriod,
}).Info("relayer waiting period")

var cnt uint64
for {
paraNonce, err = r.fetchLatestParachainNonce()
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
return fmt.Errorf("is message procssed: %w", err)
}
if ethNonce <= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
// If the message is already processed we shouldn't submit it again
if isProcessed {
return nil
}
// Check if the beacon header is finalized
if r.isInFinalizedBlock(ctx, ev) != nil {
return err
}
if cnt == waitingPeriod {
break
}
log.Info(fmt.Sprintf("sleeping for %d seconds.", time.Duration(r.config.Schedule.SleepInterval)))

time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second)
cnt++
}
Expand Down Expand Up @@ -406,13 +422,22 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa
// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
return nil
return err
}
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
isProcessed, err := r.isMessageProcessed(ev.Nonce)
if err != nil {
return fmt.Errorf("is message processed: %w", err)
}
// If the message is already processed we shouldn't submit it again
if isProcessed {
return nil
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
Expand All @@ -429,3 +454,30 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa

return nil
}

// isMessageProcessed checks if the provided event nonce has already been processed on-chain.
func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) {
paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return false, fmt.Errorf("fetch latest parachain nonce: %w", err)
}
// Check the nonce again in case another relayer processed the message while this relayer downloading beacon state
if eventNonce <= paraNonce {
log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped")
return true, nil
}

return false, nil
}

// isInFinalizedBlock checks if the block containing the event is a finalized block.
func (r *Relay) isInFinalizedBlock(ctx context.Context, event *contracts.GatewayOutboundMessageAccepted) error {
nextBlockNumber := new(big.Int).SetUint64(event.Raw.BlockNumber + 1)

blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
}
Loading