Skip to content

Commit

Permalink
fix: combined server
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Feb 28, 2024
1 parent 5100fa6 commit a2c8243
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 34 deletions.
35 changes: 33 additions & 2 deletions disperser/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
clean:
rm -rf ./bin

build: build_server build_batcher build_encoder
build: build_server build_batcher build_encoder build_combined

build_batcher:
go build -o ./bin/batcher ./cmd/batcher
Expand All @@ -12,6 +12,9 @@ build_server:
build_encoder:
go build -o ./bin/encoder ./cmd/encoder

build_combined: build_server build_batcher
go build -o ./bin/combined ./cmd/combined_server

run_batcher: build_batcher
./bin/batcher \
--batcher.pull-interval 5s \
Expand Down Expand Up @@ -60,4 +63,32 @@ run_encoder: build_encoder
--kzg.srs-order 300000 \
--kzg.num-workers 12 \
--disperser-encoder.log.level-std trace \
--disperser-encoder.log.level-file trace
--disperser-encoder.log.level-file trace

run_combined: build_combined
./bin/combined \
--chain.rpc ETH_RPC_ENDPOINT \
--chain.private-key YOUR_PRIVATE_KEY \
--chain.receipt-wait-rounds 180 \
--chain.receipt-wait-interval 1s \
--chain.gas-limit 2000000 \
--combined-server.use-memory-db \
--combined-server.storage.node-url http://0.0.0.0:5678 \
--combined-server.storage.node-url http://0.0.0.0:6789 \
--combined-server.storage.kv-url http://0.0.0.0:7890 \
--combined-server.storage.kv-stream-id 000000000000000000000000000000000000000000000000000000000000f2bd \
--combined-server.storage.flow-contract FLOW_CONTRACT_ADDR \
--disperser-server.s3-bucket-name test-zgda-blobstore \
--disperser-server.dynamodb-table-name test-BlobMetadata \
--disperser-server.grpc-port 51001 \
--batcher.s3-bucket-name test-zgda-blobstore \
--batcher.dynamodb-table-name test-BlobMetadata \
--batcher.pull-interval 5s \
--batcher.finalizer-interval 20s \
--batcher.confirmer-num 3 \
--encoder-socket 0.0.0.0:34000 \
--batcher.batch-size-limit 50 \
--batcher.srs-order 300000 \
--encoding-timeout 10s \
--chain-read-timeout 12s \
--chain-write-timeout 13s
54 changes: 52 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

eth_common "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/openweb3/web3go/types"
"github.com/prometheus/client_golang/prometheus"
pb "github.com/zero-gravity-labs/zerog-data-avail/api/grpc/disperser"
"github.com/zero-gravity-labs/zerog-data-avail/common"
Expand All @@ -29,7 +31,7 @@ const maxBlobSize = 1024 * 3968 // 3968 KB

type DispersalServer struct {
pb.UnimplementedDisperserServer
mu *sync.Mutex
mu *sync.RWMutex

config disperser.ServerConfig

Expand All @@ -44,6 +46,9 @@ type DispersalServer struct {
KVNode *kv.Client
StreamId eth_common.Hash

rpcClient *rpc.Client
latestFinalizedBlock uint32

logger common.Logger
}

Expand All @@ -60,6 +65,7 @@ func NewDispersalServer(
metadataHashAsBlobKey bool,
kvClient *kv.Client,
streamId eth_common.Hash,
rpcClient *rpc.Client,
) *DispersalServer {
return &DispersalServer{
config: config,
Expand All @@ -68,10 +74,11 @@ func NewDispersalServer(
logger: logger,
ratelimiter: ratelimiter,
rateConfig: rateConfig,
mu: &sync.Mutex{},
mu: &sync.RWMutex{},
metadataHashAsBlobKey: metadataHashAsBlobKey,
KVNode: kvClient,
StreamId: streamId,
rpcClient: rpcClient,
}
}

Expand Down Expand Up @@ -255,6 +262,16 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
}
if metadataInKV != nil {
metadata = metadataInKV
s.mu.RLock()
defer s.mu.RUnlock()
if metadata.ConfirmationInfo.ConfirmationBlockNumber <= s.latestFinalizedBlock {
metadata.BlobStatus = disperser.Finalized
}
} else {
// behavior align with aws dynamodb
metadata = &disperser.BlobMetadata{
BlobStatus: disperser.Processing,
}
}
}

Expand Down Expand Up @@ -364,10 +381,43 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
}, nil
}

func (s *DispersalServer) UpdateLatestFinalizedBlock(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

var header = types.Header{}
err := s.rpcClient.CallContext(ctxWithTimeout, &header, "eth_getBlockByNumber", "finalized", false)
if err != nil {
return err
}
if uint32(header.Number.Uint64()) > s.latestFinalizedBlock {
s.latestFinalizedBlock = uint32(header.Number.Uint64())
}
return nil
}

func (s *DispersalServer) Start(ctx context.Context) error {
s.logger.Trace("Entering Start function...")
defer s.logger.Trace("Exiting Start function...")

// fetch latest finalized block number
if s.metadataHashAsBlobKey {
go func() {
for {
err := s.UpdateLatestFinalizedBlock(ctx)
if err != nil {
s.logger.Warn("fetch latest finalized block number failed", "error", err)
} else {
s.logger.Info("latest finalized block number updated", "number", s.latestFinalizedBlock)
}
time.Sleep(time.Second * 5)
}
}()
}

// Serve grpc requests
addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.config.GrpcPort)
listener, err := net.Listen("tcp", addr)
Expand Down
8 changes: 7 additions & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/zero-gravity-labs/zerog-storage-client/kv"
"github.com/zero-gravity-labs/zerog-storage-client/node"

"github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli"
"github.com/zero-gravity-labs/zerog-data-avail/common/aws/dynamodb"
"github.com/zero-gravity-labs/zerog-data-avail/common/aws/s3"
Expand Down Expand Up @@ -98,11 +99,16 @@ func RunDisperserServer(ctx *cli.Context) error {
metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)

var kvClient *kv.Client
var rpcClient *rpc.Client

if config.BlobstoreConfig.MetadataHashAsBlobKey {
kvClient = kv.NewClient(node.MustNewClient(config.StorageNodeConfig.KVNodeURL), nil)
rpcClient, err = rpc.Dial(config.EthClientConfig.RPCURL)
if err != nil {
return err
}
}
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvClient, config.StorageNodeConfig.KVStreamId)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvClient, config.StorageNodeConfig.KVStreamId, rpcClient)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand Down
46 changes: 21 additions & 25 deletions disperser/cmd/combined_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ func RunDisperserServer(config Config, blobStore disperser.BlobStore, logger com
metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)

var kvClient *kv.Client
var rpcClient *rpc.Client

if config.BlobstoreConfig.MetadataHashAsBlobKey || config.BlobstoreConfig.InMemory {
if config.BlobstoreConfig.MetadataHashAsBlobKey {
kvClient = kv.NewClient(node.MustNewClient(config.StorageNodeConfig.KVNodeURL), nil)
var err error
rpcClient, err = rpc.Dial(config.EthClientConfig.RPCURL)
if err != nil {
return err
}
}
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvClient, config.StorageNodeConfig.KVStreamId)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvClient, config.StorageNodeConfig.KVStreamId, rpcClient)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand All @@ -94,7 +100,7 @@ func RunDisperserServer(config Config, blobStore disperser.BlobStore, logger com
return server.Start(context.Background())
}

func RunBatcher(config Config, blobStore disperser.BlobStore, logger common.Logger) error {
func RunBatcher(config Config, queue disperser.BlobStore, logger common.Logger) error {
// transactor
transactor := transactor.NewTransactor(logger)
// dispatcher
Expand All @@ -119,23 +125,6 @@ func RunBatcher(config Config, blobStore disperser.BlobStore, logger common.Logg
return err
}

// blob store
var queue disperser.BlobStore

bucketName := config.BlobstoreConfig.BucketName
s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger)
if err != nil {
return err
}
logger.Info("Initialized S3 client", "bucket", bucketName)

dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
queue = blobstore.NewSharedStorage(bucketName, s3Client, config.BlobstoreConfig.MetadataHashAsBlobKey, blobMetadataStore, logger)

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

// encoder
Expand Down Expand Up @@ -201,11 +190,18 @@ func RunCombinedServer(ctx *cli.Context) error {
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
blobStore = blobstore.NewSharedStorage(bucketName, s3Client, config.BlobstoreConfig.MetadataHashAsBlobKey, blobMetadataStore, logger)
} else {
config.BlobstoreConfig.MetadataHashAsBlobKey = true
blobStore = memorydb.NewBlobStore()
}
err = RunDisperserServer(config, blobStore, logger)
if err != nil {
return err
}
return RunBatcher(config, blobStore, logger)
errChan := make(chan error)
go func() {
err := RunDisperserServer(config, blobStore, logger)
errChan <- err
}()
go func() {
err := RunBatcher(config, blobStore, logger)
errChan <- err
}()
err = <-errChan
return err
}
2 changes: 1 addition & 1 deletion disperser/common/memorydb/memorydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (q *SharedBlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*di
defer q.mu.RUnlock()
blobs := make(map[disperser.BlobKey]*core.Blob)
for _, meta := range metadata {
if holder, ok := q.Blobs[meta.BlobHash]; ok {
if holder, ok := q.Blobs[meta.MetadataHash]; ok {
blobs[meta.GetBlobKey()] = &core.Blob{
RequestHeader: meta.RequestMetadata.BlobRequestHeader,
Data: holder.Data,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
github.com/wealdtech/go-merkletree v1.0.1-0.20230205101955-ec7a95ea11ca
github.com/zero-gravity-labs/zerog-data-avail/api v0.0.0
github.com/zero-gravity-labs/zerog-storage-client v0.1.12
github.com/zero-gravity-labs/zerog-storage-client v0.1.13
go.uber.org/automaxprocs v1.5.2
google.golang.org/grpc v1.59.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -805,8 +805,8 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zero-gravity-labs/zerog-storage-client v0.1.12 h1:jIHUfhAw3KREyqMuksJ/2K9pLl/9uN9c317mJuHadKY=
github.com/zero-gravity-labs/zerog-storage-client v0.1.12/go.mod h1:X0u50IkiCmB9hchf8mEiIWyk6bLlo0d2xAxrPPK8IZ4=
github.com/zero-gravity-labs/zerog-storage-client v0.1.13 h1:JKTHNOWygF1xqM88qp9aYmwnN6w06Lrh+KZjzHPEVaE=
github.com/zero-gravity-labs/zerog-storage-client v0.1.13/go.mod h1:X0u50IkiCmB9hchf8mEiIWyk6bLlo0d2xAxrPPK8IZ4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down

0 comments on commit a2c8243

Please sign in to comment.