Skip to content

Commit

Permalink
feat: retry options
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Feb 2, 2024
1 parent a79773d commit d38c13a
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 21 deletions.
22 changes: 22 additions & 0 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"context"
"errors"
"net/http"
"sync"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
Expand Down Expand Up @@ -95,8 +97,28 @@ func (s *client) DownloadObject(ctx context.Context, bucket string, key string)
return buffer.Bytes(), nil
}

func (s *client) keyExists(ctx context.Context, bucket string, key string) (bool, error) {
_, err := s.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
var responseError *awshttp.ResponseError
if errors.As(err, &responseError) && responseError.ResponseError.HTTPStatusCode() == http.StatusNotFound {
return false, nil
}
return false, err
}
return true, nil
}

func (s *client) UploadObject(ctx context.Context, bucket string, key string, data []byte) error {
var partMiBs int64 = 10
uploaded, _ := s.keyExists(ctx, bucket, key)
if uploaded {
s.logger.Info("object already uploaded, skip", "key", key)
return nil
}
uploader := manager.NewUploader(s.s3Client, func(u *manager.Uploader) {
u.PartSize = partMiBs * 1024 * 1024 // 10MB per part
u.Concurrency = 3 //The number of goroutines to spin up in parallel per call to Upload when sending parts
Expand Down
38 changes: 30 additions & 8 deletions common/geth/cli.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package geth

import (
"time"

"github.com/urfave/cli"
"github.com/zero-gravity-labs/zerog-data-avail/common"
)

var (
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
numConfirmationsFlagName = "chain.num-confirmations"
txGasLimitFlagName = "chain.gas-limit"
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
numConfirmationsFlagName = "chain.num-confirmations"
txGasLimitFlagName = "chain.gas-limit"
receiptPollingRoundsFlagName = "chain.receipt-wait-rounds"
receiptPollingIntervalFlagName = "chain.receipt-wait-interval"
)

type EthClientConfig struct {
RPCURL string
PrivateKeyString string
NumConfirmations int
TxGasLimit int
RPCURL string
PrivateKeyString string
NumConfirmations int
TxGasLimit int
ReceiptPollingRounds uint
ReceiptPollingInterval time.Duration
}

func EthClientFlags(envPrefix string) []cli.Flag {
Expand Down Expand Up @@ -47,6 +53,20 @@ func EthClientFlags(envPrefix string) []cli.Flag {
Value: 0,
EnvVar: common.PrefixEnvVar(envPrefix, "TX_GAS_LIMIT"),
},
cli.UintFlag{
Name: receiptPollingRoundsFlagName,
Usage: "Rounds of receipt polling",
Required: false,
Value: 60,
EnvVar: common.PrefixEnvVar(envPrefix, "RECEIPT_POLLING_ROUNDS"),
},
cli.DurationFlag{
Name: receiptPollingIntervalFlagName,
Usage: "Interval of receipt polling",
Required: false,
Value: time.Second,
EnvVar: common.PrefixEnvVar(envPrefix, "RECEIPT_POLLING_INTERVAL"),
},
}
}

Expand All @@ -56,6 +76,8 @@ func ReadEthClientConfig(ctx *cli.Context) EthClientConfig {
cfg.PrivateKeyString = ctx.GlobalString(privateKeyFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
cfg.TxGasLimit = ctx.GlobalInt(txGasLimitFlagName)
cfg.ReceiptPollingRounds = ctx.GlobalUint(receiptPollingRoundsFlagName)
cfg.ReceiptPollingInterval = ctx.GlobalDuration(receiptPollingIntervalFlagName)
return cfg
}

Expand Down
4 changes: 2 additions & 2 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (b *Batcher) Start(ctx context.Context) error {
if ts, err := b.HandleSingleBatch(ctx); err != nil {
b.EncodingStreamer.RemoveBatchingStatus(ts)
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to make a batch with")
b.logger.Debug("no encoded results to make a batch with")
} else {
b.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -150,7 +150,7 @@ func (b *Batcher) Start(ctx context.Context) error {
if ts, err := b.HandleSingleBatch(ctx); err != nil {
b.EncodingStreamer.RemoveBatchingStatus(ts)
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to make a batch with(Notified)")
b.logger.Debug("no encoded results to make a batch with(Notified)")
} else {
b.logger.Error("failed to process a batch(Notified)", "err", err)
}
Expand Down
12 changes: 9 additions & 3 deletions disperser/batcher/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Confirmer struct {

routines uint

retryOption blockchain.RetryOption

logger common.Logger
Metrics *Metrics
}
Expand Down Expand Up @@ -62,8 +64,12 @@ func NewConfirmer(ethConfig geth.EthClientConfig, storageNodeConfig storage_node
ConfirmChan: make(chan *BatchInfo),
pendingBatches: make([]*BatchInfo, 0),
routines: routines,
logger: logger,
Metrics: metrics,
retryOption: blockchain.RetryOption{
Rounds: ethConfig.ReceiptPollingRounds,
Interval: ethConfig.ReceiptPollingInterval,
},
logger: logger,
Metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -145,7 +151,7 @@ func (c *Confirmer) waitForReceipt(txHash eth_common.Hash) (uint32, uint32, erro
}
c.logger.Info("[confirmer] Waiting batch be confirmed", "transaction hash", txHash)
// data is not duplicate, there is a new transaction
receipt, err := c.Flow.WaitForReceipt(txHash, true)
receipt, err := c.Flow.WaitForReceipt(txHash, true, c.retryOption)
if err != nil {
return 0, 0, err
}
Expand Down
5 changes: 0 additions & 5 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,6 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan

e.logger.Trace("[encodingstreamer] new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))

metadataByKey := make(map[disperser.BlobKey]*disperser.BlobMetadata, 0)
for _, metadata := range metadatas {
metadataByKey[metadata.GetBlobKey()] = metadata
}

stageTimer = time.Now()
blobs, err := e.blobStore.GetBlobsByMetadata(ctx, metadatas)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
github.com/wealdtech/go-merkletree v1.0.1-0.20230205101955-ec7a95ea11ca
github.com/zero-gravity-labs/zerog-data-avail/api v0.0.0
github.com/zero-gravity-labs/zerog-storage-client v0.1.6
github.com/zero-gravity-labs/zerog-storage-client v0.1.8
go.uber.org/automaxprocs v1.5.2
go.uber.org/goleak v1.2.0
google.golang.org/grpc v1.59.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zero-gravity-labs/zerog-storage-client v0.1.6 h1:Xb+96CwjjVMm0SJSqlgDAg0haaurXouBwLx9cJbQVGo=
github.com/zero-gravity-labs/zerog-storage-client v0.1.6/go.mod h1:X0u50IkiCmB9hchf8mEiIWyk6bLlo0d2xAxrPPK8IZ4=
github.com/zero-gravity-labs/zerog-storage-client v0.1.8 h1:ceUxq7alFqUsaPIuk9/RxzjfMh+q5AZNaf1LhqGeAMk=
github.com/zero-gravity-labs/zerog-storage-client v0.1.8/go.mod h1:X0u50IkiCmB9hchf8mEiIWyk6bLlo0d2xAxrPPK8IZ4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down

0 comments on commit d38c13a

Please sign in to comment.