Skip to content

Commit

Permalink
fix: delete blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Feb 27, 2024
1 parent 7810bc8 commit b3f5b94
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 12 deletions.
13 changes: 6 additions & 7 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,18 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
return nil, err
}

s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
if !s.metadataHashAsBlobKey && s.KVNode != nil {
return nil, err
}
return nil, err
}
if metadata.GetBlobKey().String() != string(requestID) && s.metadataHashAsBlobKey {
// check on kv
metadata, err = s.getMetadataFromKv(requestID)
metadataInKV, err := s.getMetadataFromKv(requestID)
if err != nil {
s.logger.Warn("get metadata from kv", err)
}
if metadata == nil {
return nil, fmt.Errorf("request not found")
if metadataInKV != nil {
metadata = metadataInKV
}
}

Expand Down
4 changes: 3 additions & 1 deletion disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ func (b *Batcher) Start(ctx context.Context) error {
b.confirmer.EncodingStreamer = b.EncodingStreamer
b.confirmer.Start(ctx)
// finalizer
b.finalizer.Start(ctx)
if !b.Queue.MetadataHashAsBlobKey() {
b.finalizer.Start(ctx)
}

go func() {
ticker := time.NewTicker(b.PullInterval)
Expand Down
13 changes: 11 additions & 2 deletions disperser/batcher/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ func (c *Confirmer) PersistConfirmedBlobs(ctx context.Context, metadatas []*disp
if err != nil {
return errors.WithMessage(err, "failed to confirm metadata onchain")
}
c.logger.Info("[confirmer] removing confirmed blobs")
for _, metadata := range metadatas {
c.Queue.RemoveBlob(ctx, metadata)
c.logger.Info("[confirmer] removing blob", "blob key", metadata.GetBlobKey().String())
err := c.Queue.RemoveBlob(ctx, metadata)
if err != nil {
c.logger.Warn("[confirmer] failed to remove blob", "error", err)
}
}
c.logger.Info("[confirmer] confirmed blobs removed")
return nil
}

Expand Down Expand Up @@ -295,10 +301,13 @@ func (c *Confirmer) ConfirmBatch(ctx context.Context, batchInfo *BatchInfo) erro

// remove blobs
if c.Queue.MetadataHashAsBlobKey() {
stageTimer = time.Now()
c.logger.Info("[confirmer] Uploading confirmed metadata on chain")
err := c.PersistConfirmedBlobs(ctx, confirmedMetadatas)
if err != nil {
c.logger.Error("failed to upload metadata on chain: %v", err)
c.logger.Error("[confirmer] Failed to upload metadata on chain: %v", err)
}
c.logger.Info("[confirmer] Uploaded confirmed metadata on chain", "duration", time.Since(stageTimer))
}

batchSize := int64(0)
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Transactor struct {
mu *sync.Mutex
mu sync.Mutex

logger common.Logger
}
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/urfave/cli"
"github.com/zero-gravity-labs/zerog-data-avail/common"
"github.com/zero-gravity-labs/zerog-data-avail/common/aws"
"github.com/zero-gravity-labs/zerog-data-avail/common/geth"
"github.com/zero-gravity-labs/zerog-data-avail/common/logging"
"github.com/zero-gravity-labs/zerog-data-avail/common/ratelimit"
"github.com/zero-gravity-labs/zerog-data-avail/common/storage_node"
Expand Down Expand Up @@ -96,5 +97,6 @@ func init() {
Flags = append(Flags, logging.CLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, ratelimit.RatelimiterCLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...)
Flags = append(Flags, storage_node.ClientFlags(envVarPrefix, FlagPrefix)...)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ require (
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.1
github.com/google/uuid v1.3.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
Expand Down

0 comments on commit b3f5b94

Please sign in to comment.