From d35f0684561c9a2b3c0305121cd5b5b4e3c41cf4 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Fri, 9 Aug 2024 16:09:41 -0700 Subject: [PATCH] refactor blob expiration key --- node/store.go | 25 ++++++++----------------- node/store_utils.go | 12 +++++++----- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/node/store.go b/node/store.go index 73fb7ae72f..7f4539429d 100644 --- a/node/store.go +++ b/node/store.go @@ -458,17 +458,6 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs values := make([][]byte, 0) expirationTime := s.expirationTime() - expirationKey := EncodeBlobExpirationKey(expirationTime) - // expirationValue is a list of blob header hashes that are expired. - expirationValue := make([]byte, 0) - var err error - // If there is already an expiration key in the store, we need to get the value and append to it. - if s.HasKey(ctx, expirationKey) { - expirationValue, err = s.db.Get(expirationKey) - if err != nil { - return nil, fmt.Errorf("failed to get the expiration value: %w", err) - } - } // Generate key/value pairs for all blob headers and blob chunks . size := int64(0) @@ -479,11 +468,17 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs return nil, fmt.Errorf("internal error: the number of bundles in parsed blob (%d) must be the same as in raw blob (%d)", len(rawBlob.GetBundles()), len(blob.Bundles)) } - // blob header blobHeaderHash, err := blob.BlobHeader.GetBlobHeaderHash() if err != nil { return nil, fmt.Errorf("failed to get blob header hash: %w", err) } + + // expiration key + expirationKey := EncodeBlobExpirationKey(expirationTime, blobHeaderHash) + keys = append(keys, expirationKey) + values = append(values, blobHeaderHash[:]) + + // blob header blobHeaderKey := EncodeBlobHeaderKeyByHash(blobHeaderHash) if s.HasKey(ctx, blobHeaderKey) { s.logger.Warn("Blob already exists", "blobHeaderHash", hexutil.Encode(blobHeaderHash[:])) @@ -496,7 +491,6 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs } keys = append(keys, blobHeaderKey) values = append(values, blobHeaderBytes) - expirationValue = append(expirationValue, blobHeaderHash[:]...) // Get raw chunks start := time.Now() @@ -559,12 +553,9 @@ func (s *Store) StoreBlobs(ctx context.Context, blobs []*core.BlobMessage, blobs encodingDuration += time.Since(start) } - keys = append(keys, expirationKey) - values = append(values, expirationValue) - start := time.Now() // Write all the key/value pairs to the local database atomically. - err = s.db.WriteBatch(keys, values) + err := s.db.WriteBatch(keys, values) if err != nil { return nil, fmt.Errorf("failed to write the batch into local database: %w", err) } diff --git a/node/store_utils.go b/node/store_utils.go index c6aa764e38..431aafdad2 100644 --- a/node/store_utils.go +++ b/node/store_utils.go @@ -117,7 +117,7 @@ func EncodeBatchMappingExpirationKeyPrefix() []byte { return []byte(batchMappingExpirationPrefix) } -// Returns an encoded key for expration time. +// EncodeBatchExpirationKey returns an encoded key for expration time. // Note: the encoded key will preserve the order of expiration time, that is, // expirationTime1 < expirationTime2 <=> // EncodeBatchExpirationKey(expirationTime1) < EncodeBatchExpirationKey(expirationTime2) @@ -130,14 +130,16 @@ func EncodeBatchExpirationKey(expirationTime int64) []byte { } // EncodeBlobExpirationKey returns an encoded key for expration time for blob header hashes. +// Encodes the expiration time and the blob header hash into a single key. // Note: the encoded key will preserve the order of expiration time, that is, // expirationTime1 < expirationTime2 <=> // EncodeBlobExpirationKey(expirationTime1) < EncodeBlobExpirationKey(expirationTime2) -func EncodeBlobExpirationKey(expirationTime int64) []byte { +func EncodeBlobExpirationKey(expirationTime int64, blobHeaderHash [32]byte) []byte { prefix := []byte(blobExpirationPrefix) ts := make([]byte, 8) binary.BigEndian.PutUint64(ts[0:8], uint64(expirationTime)) buf := bytes.NewBuffer(append(prefix, ts[:]...)) + buf.Write(blobHeaderHash[:]) return buf.Bytes() } @@ -164,12 +166,12 @@ func DecodeBatchExpirationKey(key []byte) (int64, error) { return ts, nil } -// Returns the expiration timestamp encoded in the key. +// DecodeBlobExpirationKey returns the expiration timestamp encoded in the key. func DecodeBlobExpirationKey(key []byte) (int64, error) { - if len(key) != len(blobExpirationPrefix)+8 { + if len(key) != len(blobExpirationPrefix)+8+32 { return 0, errors.New("the expiration key is invalid") } - ts := int64(binary.BigEndian.Uint64(key[len(key)-8:])) + ts := int64(binary.BigEndian.Uint64(key[len(key)-8-32 : len(key)-32])) return ts, nil }