Skip to content

(follower_node)support beacon node client as blob provider #988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var (
utils.DASnapshotFileFlag,
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
}

rpcFlags = []cli.Flag{
Expand Down
14 changes: 10 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,16 +879,19 @@ var (
}
DASnapshotFileFlag = cli.StringFlag{
Name: "da.snapshot.file",
Usage: "Snapshot file to sync from da",
Usage: "Snapshot file to sync from DA",
}
DABlobScanAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blobscan",
Usage: "BlobScan blob api endpoint",
Value: ethconfig.Defaults.DA.BlobScanAPIEndpoint,
Usage: "BlobScan blob API endpoint",
}
DABlockNativeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.blocknative",
Usage: "BlockNative blob api endpoint",
Usage: "BlockNative blob API endpoint",
}
DABeaconNodeAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.beaconnode",
Usage: "Beacon node API endpoint",
}
)

Expand Down Expand Up @@ -1625,6 +1628,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(DABlockNativeAPIEndpointFlag.Name) {
cfg.DA.BlockNativeAPIEndpoint = ctx.GlobalString(DABlockNativeAPIEndpointFlag.Name)
}
if ctx.GlobalIsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.GlobalString(DABeaconNodeAPIEndpointFlag.Name)
}
}
}

Expand Down
183 changes: 183 additions & 0 deletions rollup/da_syncer/blob_client/beacon_node_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package blob_client

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
)

type BeaconNodeClient struct {
apiEndpoint string
l1Client *rollup_sync_service.L1Client
genesisTime uint64
secondsPerSlot uint64
}

var (
beaconNodeGenesisEndpoint = "/eth/v1/beacon/genesis"
beaconNodeSpecEndpoint = "/eth/v1/config/spec"
beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars"
)

func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) {
// get genesis time
genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err := http.Get(genesisPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var genesisResp GenesisResp
err = json.NewDecoder(resp.Body).Decode(&genesisResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}
genesisTime, err := strconv.ParseUint(genesisResp.Data.GenesisTime, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to decode genesis time %s, err: %w", genesisResp.Data.GenesisTime, err)
}

// get seconds per slot from spec
specPath, err := url.JoinPath(apiEndpoint, beaconNodeSpecEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err = http.Get(specPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var specResp SpecResp
err = json.NewDecoder(resp.Body).Decode(&specResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}
secondsPerSlot, err := strconv.ParseUint(specResp.Data.SecondsPerSlot, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to decode seconds per slot %s, err: %w", specResp.Data.SecondsPerSlot, err)
}
if secondsPerSlot == 0 {
return nil, fmt.Errorf("failed to make new BeaconNodeClient, secondsPerSlot is 0")
}

return &BeaconNodeClient{
apiEndpoint: apiEndpoint,
l1Client: l1Client,
genesisTime: genesisTime,
secondsPerSlot: secondsPerSlot,
}, nil
}

func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// get block timestamp to calculate slot
header, err := c.l1Client.GetHeaderByNumber(blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
}
slot := (header.Time - c.genesisTime) / c.secondsPerSlot

// get blob sidecar for slot
blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot))
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
resp, err := http.Get(blobSidecarPath)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
return nil, fmt.Errorf("beacon node request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var blobSidecarResp BlobSidecarResp
err = json.NewDecoder(resp.Body).Decode(&blobSidecarResp)
if err != nil {
return nil, fmt.Errorf("failed to decode result into struct, err: %w", err)
}

// find blob with desired versionedHash
for _, blob := range blobSidecarResp.Data {
// calculate blob hash from commitment and check it with desired
commitmentBytes := common.FromHex(blob.KzgCommitment)
if len(commitmentBytes) != lenKZGCommitment {
return nil, fmt.Errorf("len of kzg commitment is not correct, expected: %d, got: %d", lenKZGCommitment, len(commitmentBytes))
}
commitment := kzg4844.Commitment(commitmentBytes)
blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)

if blobVersionedHash == versionedHash {
// found desired blob
blobBytes := common.FromHex(blob.Blob)
if len(blobBytes) != lenBlobBytes {
return nil, fmt.Errorf("len of blob data is not correct, expected: %d, got: %d", lenBlobBytes, len(blobBytes))
}

b := kzg4844.Blob(blobBytes)
return &b, nil
}
}

return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber)
}

type GenesisResp struct {
Data struct {
GenesisTime string `json:"genesis_time"`
} `json:"data"`
}

type SpecResp struct {
Data struct {
SecondsPerSlot string `json:"SECONDS_PER_SLOT"`
} `json:"data"`
}

type BlobSidecarResp struct {
Data []struct {
Index string `json:"index"`
Blob string `json:"blob"`
KzgCommitment string `json:"kzg_commitment"`
KzgProof string `json:"kzg_proof"`
SignedBlockHeader struct {
Message struct {
Slot string `json:"slot"`
ProposerIndex string `json:"proposer_index"`
ParentRoot string `json:"parent_root"`
StateRoot string `json:"state_root"`
BodyRoot string `json:"body_root"`
} `json:"message"`
Signature string `json:"signature"`
} `json:"signed_block_header"`
KzgCommitmentInclusionProof []string `json:"kzg_commitment_inclusion_proof"`
} `json:"data"`
}
6 changes: 3 additions & 3 deletions rollup/da_syncer/blob_client/blob_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
)

const (
okStatusCode int = 200
lenBlobBytes int = 131072
lenBlobBytes int = 131072
lenKZGCommitment int = 48
)

type BlobClient interface {
GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error)
GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error)
}
9 changes: 4 additions & 5 deletions rollup/da_syncer/blob_client/blob_client_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func NewBlobClientList(blobClients ...BlobClient) *BlobClientList {
}
}

func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlobClientList) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
if len(c.list) == 0 {
return nil, fmt.Errorf("BlobClientList.GetBlobByVersionedHash: list of BlobClients is empty")
}

for i := 0; i < len(c.list); i++ {
blob, err := c.list[c.nextPos()].GetBlobByVersionedHash(ctx, versionedHash)
blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber)
if err == nil {
return blob, nil
}

c.nextPos()
// there was an error, try the next blob client in following iteration
log.Warn("BlobClientList: failed to get blob by versioned hash from BlobClient", "err", err, "blob client pos in BlobClientList", c.curPos)
}
Expand All @@ -42,9 +42,8 @@ func (c *BlobClientList) GetBlobByVersionedHash(ctx context.Context, versionedHa
return nil, serrors.NewTemporaryError(errors.New("BlobClientList.GetBlobByVersionedHash: failed to get blob by versioned hash from all BlobClients"))
}

func (c *BlobClientList) nextPos() int {
func (c *BlobClientList) nextPos() {
c.curPos = (c.curPos + 1) % len(c.list)
return c.curPos
}

func (c *BlobClientList) AddBlobClient(blobClient BlobClient) {
Expand Down
42 changes: 4 additions & 38 deletions rollup/da_syncer/blob_client/blob_scan_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient {
}
}

func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand All @@ -40,8 +40,8 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != okStatusCode {
if resp.StatusCode == 404 {
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("no blob with versioned hash : %s", versionedHash.String())
}
var res ErrorRespBlobScan
Expand Down Expand Up @@ -69,44 +69,10 @@ func (c *BlobScanClient) GetBlobByVersionedHash(ctx context.Context, versionedHa
}

type BlobRespBlobScan struct {
Commitment string `json:"commitment"`
Proof string `json:"proof"`
Size int `json:"size"`
VersionedHash string `json:"versionedHash"`
Data string `json:"data"`
DataStorageReferences []struct {
BlobStorage string `json:"blobStorage"`
DataReference string `json:"dataReference"`
} `json:"dataStorageReferences"`
Transactions []struct {
Hash string `json:"hash"`
Index int `json:"index"`
Block struct {
Number int `json:"number"`
BlobGasUsed string `json:"blobGasUsed"`
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
BlobGasPrice string `json:"blobGasPrice"`
ExcessBlobGas string `json:"excessBlobGas"`
Hash string `json:"hash"`
Timestamp string `json:"timestamp"`
Slot int `json:"slot"`
} `json:"block"`
From string `json:"from"`
To string `json:"to"`
MaxFeePerBlobGas string `json:"maxFeePerBlobGas"`
BlobAsCalldataGasUsed string `json:"blobAsCalldataGasUsed"`
Rollup string `json:"rollup"`
BlobAsCalldataGasFee string `json:"blobAsCalldataGasFee"`
BlobGasBaseFee string `json:"blobGasBaseFee"`
BlobGasMaxFee string `json:"blobGasMaxFee"`
BlobGasUsed string `json:"blobGasUsed"`
} `json:"transactions"`
Data string `json:"data"`
}

type ErrorRespBlobScan struct {
Message string `json:"message"`
Code string `json:"code"`
Issues []struct {
Message string `json:"message"`
} `json:"issues"`
}
11 changes: 3 additions & 8 deletions rollup/da_syncer/blob_client/block_native_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient {
}
}

func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versionedHash common.Hash) (*kzg4844.Blob, error) {
func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand All @@ -33,7 +33,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != okStatusCode {
if resp.StatusCode != http.StatusOK {
var res ErrorRespBlockNative
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
Expand All @@ -59,12 +59,7 @@ func (c *BlockNativeClient) GetBlobByVersionedHash(ctx context.Context, versione

type BlobRespBlockNative struct {
Blob struct {
VersionedHash string `json:"versionedHash"`
Commitment string `json:"commitment"`
Proof string `json:"proof"`
ZeroBytes int `json:"zeroBytes"`
NonZeroBytes int `json:"nonZeroBytes"`
Data string `json:"data"`
Data string `json:"data"`
} `json:"blob"`
}

Expand Down
2 changes: 1 addition & 1 deletion rollup/da_syncer/da/commitV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database
return nil, fmt.Errorf("failed to fetch blob hash, err: %w", err)
}

blob, err := blobClient.GetBlobByVersionedHash(ctx, versionedHash)
blob, err := blobClient.GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, vLog.BlockNumber)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob from blob client, err: %w", err)
}
Expand Down
Loading
Loading