Skip to content

Commit

Permalink
feat: commit metadata on kv
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Feb 26, 2024
1 parent 4af6f61 commit 7810bc8
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 35 deletions.
3 changes: 2 additions & 1 deletion common/geth/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func EthClientFlags(envPrefix string) []cli.Flag {
cli.StringFlag{
Name: privateKeyFlagName,
Usage: "Ethereum private key for disperser",
Required: true,
Required: false,
Value: "0000000000000000000000000000000000000000000000000000000000000000",
EnvVar: common.PrefixEnvVar(envPrefix, "PRIVATE_KEY"),
},
cli.IntFlag{
Expand Down
12 changes: 8 additions & 4 deletions common/storage_node/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,29 @@ func ClientFlags(envPrefix string, flagPrefix string) []cli.Flag {
cli.StringSliceFlag{
Name: common.PrefixFlag(flagPrefix, StorageNodeURLsFlagName),
Usage: "storage node urls",
Required: true,
Required: false,
Value: nil,
EnvVar: common.PrefixEnvVar(envPrefix, "STORAGE_NODE_URLS"),
},
cli.StringFlag{
Name: common.PrefixFlag(flagPrefix, FlowContractAddressFlagName),
Usage: "flow contract address",
Required: true,
Required: false,
Value: "0x0000000000000000000000000000000000000000",
EnvVar: common.PrefixEnvVar(envPrefix, "STORAGE_NODE_URLS"),
},
cli.StringFlag{
Name: common.PrefixFlag(flagPrefix, KVNodeURLFlagName),
Usage: "kv node url",
Required: true,
Required: false,
Value: "",
EnvVar: common.PrefixEnvVar(envPrefix, "KV_NODE_URL"),
},
cli.StringFlag{
Name: common.PrefixFlag(flagPrefix, KVStreamIDFlagName),
Usage: "kv stream id",
Required: true,
Required: false,
Value: "0000000000000000000000000000000000000000000000000000000000000000",
EnvVar: common.PrefixEnvVar(envPrefix, "KV_NODE_URL"),
},
cli.UintFlag{
Expand Down
28 changes: 14 additions & 14 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,38 +289,38 @@ func (h *BlobHeader) Encode() ([]byte, error) {
}

func (h *BatchHeader) Serialize() ([]byte, error) {
return encode(h)
return Encode(h)
}

func (h *BatchHeader) Deserialize(data []byte) (*BatchHeader, error) {
err := decode(data, h)
err := Decode(data, h)
return h, err
}

func (h *BlobHeader) Serialize() ([]byte, error) {
return encode(h)
return Encode(h)
}

func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) {
err := decode(data, h)
err := Decode(data, h)
return h, err
}

func (c *Chunk) Serialize() ([]byte, error) {
return encode(c)
return Encode(c)
}

func (c *Chunk) Deserialize(data []byte) (*Chunk, error) {
err := decode(data, c)
err := Decode(data, c)
return c, err
}

func (c Commitment) Serialize() ([]byte, error) {
return encode(c)
return Encode(c)
}

func (c *Commitment) Deserialize(data []byte) (*Commitment, error) {
err := decode(data, c)
err := Decode(data, c)
return c, err
}

Expand Down Expand Up @@ -357,24 +357,24 @@ func (h *KVBlobInfoKey) FromBytes(data []byte) (*KVBlobInfoKey, error) {
}

func (h *KVBlobInfo) Serialize() ([]byte, error) {
return encode(h)
return Encode(h)
}

func (h *KVBlobInfo) Deserialize(data []byte) (*KVBlobInfo, error) {
err := decode(data, h)
err := Decode(data, h)
return h, err
}

func (h *KVBatchInfo) Serialize() ([]byte, error) {
return encode(h)
return Encode(h)
}

func (h *KVBatchInfo) Deserialize(data []byte) (*KVBatchInfo, error) {
err := decode(data, h)
err := Decode(data, h)
return h, err
}

func encode(obj any) ([]byte, error) {
func Encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(obj)
Expand All @@ -384,7 +384,7 @@ func encode(obj any) ([]byte, error) {
return buf.Bytes(), nil
}

func decode(data []byte, obj any) error {
func Decode(data []byte, obj any) error {
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
err := dec.Decode(obj)
Expand Down
53 changes: 45 additions & 8 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"sync"
"time"

eth_common "github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
pb "github.com/zero-gravity-labs/zerog-data-avail/api/grpc/disperser"
"github.com/zero-gravity-labs/zerog-data-avail/common"
healthcheck "github.com/zero-gravity-labs/zerog-data-avail/common/healthcheck"
"github.com/zero-gravity-labs/zerog-data-avail/core"
"github.com/zero-gravity-labs/zerog-data-avail/disperser"
"github.com/zero-gravity-labs/zerog-storage-client/kv"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
Expand All @@ -38,6 +40,10 @@ type DispersalServer struct {

metrics *disperser.Metrics

metadataHashAsBlobKey bool
KVNode *kv.Client
StreamId eth_common.Hash

logger common.Logger
}

Expand All @@ -51,15 +57,21 @@ func NewDispersalServer(
metrics *disperser.Metrics,
ratelimiter common.RateLimiter,
rateConfig RateConfig,
metadataHashAsBlobKey bool,
kvClient *kv.Client,
streamId eth_common.Hash,
) *DispersalServer {
return &DispersalServer{
config: config,
blobStore: store,
metrics: metrics,
logger: logger,
ratelimiter: ratelimiter,
rateConfig: rateConfig,
mu: &sync.Mutex{},
config: config,
blobStore: store,
metrics: metrics,
logger: logger,
ratelimiter: ratelimiter,
rateConfig: rateConfig,
mu: &sync.Mutex{},
metadataHashAsBlobKey: metadataHashAsBlobKey,
KVNode: kvClient,
StreamId: streamId,
}
}

Expand Down Expand Up @@ -199,6 +211,21 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *

}

func (s *DispersalServer) getMetadataFromKv(key []byte) (*disperser.BlobMetadata, error) {
val, err := s.KVNode.GetValue(s.StreamId, key)
if err != nil {
return nil, fmt.Errorf("failed to get blob metadata from kv node: %v", err)
}
if len(val.Data) == 0 {
return nil, nil
}
metadata, err := new(disperser.BlobMetadata).Deserialize(val.Data)
if err != nil {
return nil, fmt.Errorf("failed to deserialize blob metadata: %v", err)
}
return metadata, nil
}

func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("GetBlobStatus", f*1000) // make milliseconds
Expand All @@ -219,7 +246,17 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
return nil, err
if !s.metadataHashAsBlobKey && s.KVNode != nil {
return nil, err
}
// check on kv
metadata, err = s.getMetadataFromKv(requestID)
if err != nil {
s.logger.Warn("get metadata from kv", err)
}
if metadata == nil {
return nil, fmt.Errorf("request not found")
}
}

isConfirmed, err := metadata.IsConfirmed()
Expand Down
71 changes: 64 additions & 7 deletions disperser/batcher/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/zero-gravity-labs/zerog-data-avail/disperser/batcher/transactor"
"github.com/zero-gravity-labs/zerog-storage-client/common/blockchain"
"github.com/zero-gravity-labs/zerog-storage-client/contract"
zg_core "github.com/zero-gravity-labs/zerog-storage-client/core"
"github.com/zero-gravity-labs/zerog-storage-client/kv"
"github.com/zero-gravity-labs/zerog-storage-client/node"
"github.com/zero-gravity-labs/zerog-storage-client/transfer"
)

Expand All @@ -36,7 +39,11 @@ type Confirmer struct {

retryOption blockchain.RetryOption

transactor *transactor.Transactor
Nodes []*node.Client
KVNode *kv.Client
StreamId eth_common.Hash
UploadTaskSize uint
transactor *transactor.Transactor

logger common.Logger
Metrics *Metrics
Expand Down Expand Up @@ -71,9 +78,13 @@ func NewConfirmer(ethConfig geth.EthClientConfig, storageNodeConfig storage_node
Rounds: ethConfig.ReceiptPollingRounds,
Interval: ethConfig.ReceiptPollingInterval,
},
logger: logger,
Metrics: metrics,
transactor: transactor,
logger: logger,
Metrics: metrics,
Nodes: node.MustNewClients(storageNodeConfig.StorageNodeURLs),
KVNode: kv.NewClient(node.MustNewClient(storageNodeConfig.KVNodeURL), nil),
StreamId: storageNodeConfig.KVStreamId,
UploadTaskSize: storageNodeConfig.UploadTaskSize,
transactor: transactor,
}, nil
}

Expand Down Expand Up @@ -177,8 +188,51 @@ func (c *Confirmer) waitForReceipt(txHash eth_common.Hash) (uint32, uint32, erro
return uint32(submission.SubmissionIndex.Uint64()), uint32(receipt.BlockNumber), nil
}

func (c *Confirmer) PersistConfirmedBlobs(metadatas []*disperser.BlobMetadata) {

func (c *Confirmer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disperser.BlobMetadata) error {
uploader := transfer.NewUploader(c.Flow, c.Nodes)
batcher := c.KVNode.Batcher()
for _, metadata := range metadatas {
blobKey := metadata.GetBlobKey()
key := []byte(blobKey.String())
value, err := metadata.Serialize()
if err != nil {
return errors.WithMessage(err, "Failed to serialize blob metadata")
}
batcher.Set(c.StreamId, key, value)
}
streamData, err := batcher.Build()
if err != nil {
return errors.WithMessage(err, "Failed to build stream data")
}
rawKVData, err := streamData.Encode()
if err != nil {
return errors.WithMessage(err, "Failed to encode stream data")
}
kvData, err := zg_core.NewDataInMemory(rawKVData)
if err != nil {
return errors.WithMessage(err, "failed to build kv data")
}
// upload
txHash, _, err := c.transactor.BatchUpload(uploader, []zg_core.IterableData{kvData}, []transfer.UploadOption{
// kv options
{
Tags: batcher.BuildTags(),
Force: true,
Disperse: false,
TaskSize: c.UploadTaskSize,
}})
if err != nil {
return errors.WithMessage(err, "failed to upload file")
}
// wait for receipt
_, _, err = c.waitForReceipt(txHash)
if err != nil {
return errors.WithMessage(err, "failed to confirm metadata onchain")
}
for _, metadata := range metadatas {
c.Queue.RemoveBlob(ctx, metadata)
}
return nil
}

func (c *Confirmer) ConfirmBatch(ctx context.Context, batchInfo *BatchInfo) error {
Expand Down Expand Up @@ -241,7 +295,10 @@ func (c *Confirmer) ConfirmBatch(ctx context.Context, batchInfo *BatchInfo) erro

// remove blobs
if c.Queue.MetadataHashAsBlobKey() {
c.PersistConfirmedBlobs(confirmedMetadatas)
err := c.PersistConfirmedBlobs(ctx, confirmedMetadatas)
if err != nil {
c.logger.Error("failed to upload metadata on chain: %v", err)
}
}

batchSize := int64(0)
Expand Down
6 changes: 6 additions & 0 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"github.com/urfave/cli"
"github.com/zero-gravity-labs/zerog-data-avail/common/aws"
"github.com/zero-gravity-labs/zerog-data-avail/common/geth"
"github.com/zero-gravity-labs/zerog-data-avail/common/logging"
"github.com/zero-gravity-labs/zerog-data-avail/common/ratelimit"
"github.com/zero-gravity-labs/zerog-data-avail/common/storage_node"
"github.com/zero-gravity-labs/zerog-data-avail/disperser"
"github.com/zero-gravity-labs/zerog-data-avail/disperser/apiserver"
"github.com/zero-gravity-labs/zerog-data-avail/disperser/cmd/apiserver/flags"
Expand All @@ -19,6 +21,8 @@ type Config struct {
MetricsConfig disperser.MetricsConfig
RatelimiterConfig ratelimit.Config
RateConfig apiserver.RateConfig
StorageNodeConfig storage_node.ClientConfig
EthClientConfig geth.EthClientConfig
EnableRatelimiter bool
BucketTableName string
BucketStoreSize int
Expand All @@ -41,6 +45,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
ServerConfig: disperser.ServerConfig{
GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name),
},
EthClientConfig: geth.ReadEthClientConfig(ctx),
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
Expand All @@ -56,6 +61,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
BucketTableName: ctx.GlobalString(flags.BucketTableName.Name),
BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name),
StorageNodeConfig: storage_node.ReadClientConfig(ctx, flags.FlagPrefix),
}
return config, nil
}
2 changes: 2 additions & 0 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/zero-gravity-labs/zerog-data-avail/common/aws"
"github.com/zero-gravity-labs/zerog-data-avail/common/logging"
"github.com/zero-gravity-labs/zerog-data-avail/common/ratelimit"
"github.com/zero-gravity-labs/zerog-data-avail/common/storage_node"
)

const (
Expand Down Expand Up @@ -95,4 +96,5 @@ func init() {
Flags = append(Flags, logging.CLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, ratelimit.RatelimiterCLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, storage_node.ClientFlags(envVarPrefix, FlagPrefix)...)
}
Loading

0 comments on commit 7810bc8

Please sign in to comment.