Skip to content

Commit

Permalink
refactor blob expiration key
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Aug 15, 2024
1 parent 12f1a6f commit d35f068
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
25 changes: 8 additions & 17 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,17 +458,6 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs
values := make([][]byte, 0)

expirationTime := s.expirationTime()
expirationKey := EncodeBlobExpirationKey(expirationTime)
// expirationValue is a list of blob header hashes that are expired.
expirationValue := make([]byte, 0)
var err error
// If there is already an expiration key in the store, we need to get the value and append to it.
if s.HasKey(ctx, expirationKey) {
expirationValue, err = s.db.Get(expirationKey)
if err != nil {
return nil, fmt.Errorf("failed to get the expiration value: %w", err)
}
}

// Generate key/value pairs for all blob headers and blob chunks .
size := int64(0)
Expand All @@ -479,11 +468,17 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs
return nil, fmt.Errorf("internal error: the number of bundles in parsed blob (%d) must be the same as in raw blob (%d)", len(rawBlob.GetBundles()), len(blob.Bundles))
}

// blob header
blobHeaderHash, err := blob.BlobHeader.GetBlobHeaderHash()
if err != nil {
return nil, fmt.Errorf("failed to get blob header hash: %w", err)
}

// expiration key
expirationKey := EncodeBlobExpirationKey(expirationTime, blobHeaderHash)
keys = append(keys, expirationKey)
values = append(values, blobHeaderHash[:])

// blob header
blobHeaderKey := EncodeBlobHeaderKeyByHash(blobHeaderHash)
if s.HasKey(ctx, blobHeaderKey) {
s.logger.Warn("Blob already exists", "blobHeaderHash", hexutil.Encode(blobHeaderHash[:]))
Expand All @@ -496,7 +491,6 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs
}
keys = append(keys, blobHeaderKey)
values = append(values, blobHeaderBytes)
expirationValue = append(expirationValue, blobHeaderHash[:]...)

// Get raw chunks
start := time.Now()
Expand Down Expand Up @@ -559,12 +553,9 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs
encodingDuration += time.Since(start)
}

keys = append(keys, expirationKey)
values = append(values, expirationValue)

start := time.Now()
// Write all the key/value pairs to the local database atomically.
err = s.db.WriteBatch(keys, values)
err := s.db.WriteBatch(keys, values)
if err != nil {
return nil, fmt.Errorf("failed to write the batch into local database: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions node/store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func EncodeBatchMappingExpirationKeyPrefix() []byte {
return []byte(batchMappingExpirationPrefix)
}

// Returns an encoded key for expration time.
// EncodeBatchExpirationKey returns an encoded key for expration time.
// Note: the encoded key will preserve the order of expiration time, that is,
// expirationTime1 < expirationTime2 <=>
// EncodeBatchExpirationKey(expirationTime1) < EncodeBatchExpirationKey(expirationTime2)
Expand All @@ -130,14 +130,16 @@ func EncodeBatchExpirationKey(expirationTime int64) []byte {
}

// EncodeBlobExpirationKey returns an encoded key for expration time for blob header hashes.
// Encodes the expiration time and the blob header hash into a single key.
// Note: the encoded key will preserve the order of expiration time, that is,
// expirationTime1 < expirationTime2 <=>
// EncodeBlobExpirationKey(expirationTime1) < EncodeBlobExpirationKey(expirationTime2)
func EncodeBlobExpirationKey(expirationTime int64) []byte {
func EncodeBlobExpirationKey(expirationTime int64, blobHeaderHash [32]byte) []byte {
prefix := []byte(blobExpirationPrefix)
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts[0:8], uint64(expirationTime))
buf := bytes.NewBuffer(append(prefix, ts[:]...))
buf.Write(blobHeaderHash[:])
return buf.Bytes()
}

Expand All @@ -164,12 +166,12 @@ func DecodeBatchExpirationKey(key []byte) (int64, error) {
return ts, nil
}

// Returns the expiration timestamp encoded in the key.
// DecodeBlobExpirationKey returns the expiration timestamp encoded in the key.
func DecodeBlobExpirationKey(key []byte) (int64, error) {
if len(key) != len(blobExpirationPrefix)+8 {
if len(key) != len(blobExpirationPrefix)+8+32 {
return 0, errors.New("the expiration key is invalid")
}
ts := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
ts := int64(binary.BigEndian.Uint64(key[len(key)-8-32 : len(key)-32]))
return ts, nil
}

Expand Down

0 comments on commit d35f068

Please sign in to comment.