Skip to content

feat: add da blob client aws s3 #1209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
utils.DAAwsS3BlobAPIEndpointFlag,
utils.DARecoveryModeFlag,
utils.DARecoveryInitialL1BlockFlag,
utils.DARecoveryInitialBatchFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.DABlobScanAPIEndpointFlag,
utils.DABlockNativeAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
utils.DAAwsS3BlobAPIEndpointFlag,
utils.DARecoveryModeFlag,
utils.DARecoveryInitialL1BlockFlag,
utils.DARecoveryInitialBatchFlag,
Expand Down
7 changes: 7 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,10 @@ var (
Name: "da.blob.beaconnode",
Usage: "Beacon node API endpoint",
}
DAAwsS3BlobAPIEndpointFlag = cli.StringFlag{
Name: "da.blob.awss3",
Usage: "AWS S3 blob API endpoint",
}
DARecoveryModeFlag = cli.BoolFlag{
Name: "da.recovery",
Usage: "Enable recovery mode for DA syncing",
Expand Down Expand Up @@ -1699,6 +1703,9 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
}
if ctx.IsSet(DAAwsS3BlobAPIEndpointFlag.Name) {
cfg.DA.AwsS3BlobAPIEndpoint = ctx.String(DAAwsS3BlobAPIEndpointFlag.Name)
}
if ctx.IsSet(DARecoveryModeFlag.Name) {
cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release
VersionPatch = 57 // Patch version component of the current release
VersionPatch = 58 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
80 changes: 80 additions & 0 deletions rollup/da_syncer/blob_client/aws_s3_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package blob_client

import (
"context"
"crypto/sha256"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/common/hexutil"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
)

const (
AwsS3DefaultTimeout = 15 * time.Second
)

type AwsS3Client struct {
client *http.Client
apiEndpoint string
}

func NewAwsS3Client(apiEndpoint string) *AwsS3Client {
return &AwsS3Client{
apiEndpoint: apiEndpoint,
client: &http.Client{Timeout: AwsS3DefaultTimeout},
}
}

func (c *AwsS3Client) GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error) {
// Scroll mainnet blob data AWS S3 endpoint: https://scroll-mainnet-blob-data.s3.us-west-2.amazonaws.com/
// Scroll sepolia blob data AWS S3 endpoint: https://scroll-sepolia-blob-data.s3.us-west-2.amazonaws.com/
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
return nil, fmt.Errorf("failed to join path, err: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", path, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request, err: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot do request, err: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("aws s3 request failed with status: %s: could not read response body: %w", resp.Status, err)
}
bodyStr := string(body)
return nil, fmt.Errorf("aws s3 request failed, status: %s, body: %s", resp.Status, bodyStr)
}

var blob kzg4844.Blob
buf := blob[:]
if n, err := io.ReadFull(resp.Body, buf); err != nil {
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, fmt.Errorf("blob data too short: got %d bytes", n)
}
return nil, fmt.Errorf("failed to read blob data: %w", err)
}

// sanity check that retrieved blob matches versioned hash
commitment, err := kzg4844.BlobToCommitment(&blob)
if err != nil {
return nil, fmt.Errorf("failed to convert blob to commitment, err: %w", err)
}

blobVersionedHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
if blobVersionedHash != versionedHash {
return nil, fmt.Errorf("blob versioned hash mismatch, expected: %s, got: %s", versionedHash.String(), hexutil.Encode(blobVersionedHash[:]))
}

return &blob, nil
}
4 changes: 4 additions & 0 deletions rollup/da_syncer/syncing_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
BlobScanAPIEndpoint string // BlobScan blob api endpoint
BlockNativeAPIEndpoint string // BlockNative blob api endpoint
BeaconNodeAPIEndpoint string // Beacon node api endpoint
AwsS3BlobAPIEndpoint string // AWS S3 blob data api endpoint

RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch
InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests)
Expand Down Expand Up @@ -74,6 +75,9 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
if config.BlockNativeAPIEndpoint != "" {
blobClientList.AddBlobClient(blob_client.NewBlockNativeClient(config.BlockNativeAPIEndpoint))
}
if config.AwsS3BlobAPIEndpoint != "" {
blobClientList.AddBlobClient(blob_client.NewAwsS3Client(config.AwsS3BlobAPIEndpoint))
}
if blobClientList.Size() == 0 {
return nil, errors.New("DA syncing is enabled but no blob client is configured. Please provide at least one blob client via command line flag")
}
Expand Down
3 changes: 3 additions & 0 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
if config.BlockNativeAPIEndpoint != "" {
blobClientList.AddBlobClient(blob_client.NewBlockNativeClient(config.BlockNativeAPIEndpoint))
}
if config.AwsS3BlobAPIEndpoint != "" {
blobClientList.AddBlobClient(blob_client.NewAwsS3Client(config.AwsS3BlobAPIEndpoint))
}
if blobClientList.Size() == 0 {
return nil, errors.New("no blob client is configured for rollup verifier. Please provide at least one blob client via command line flag")
}
Expand Down
Loading