diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 7fba0f0e26..5e19e5de5b 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -165,6 +165,8 @@ func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item, return resp.Item, nil } +// GetItems returns the items for the given keys +// Note: ordering of items is not guaranteed func (c *Client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) { items, err := c.readItems(ctx, tableName, keys) if err != nil { diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 033814bb79..e5301d51f4 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -64,18 +64,18 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada return s.dynamoDBClient.PutItem(ctx, s.tableName, item) } -func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey) (*disperser.BlobMetadata, error) { +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*disperser.BlobMetadata, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "BlobHash": &types.AttributeValueMemberS{ - Value: metadataKey.BlobHash, + Value: blobKey.BlobHash, }, "MetadataHash": &types.AttributeValueMemberS{ - Value: metadataKey.MetadataHash, + Value: blobKey.MetadataHash, }, }) if item == nil { - return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, metadataKey) + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey) } if err != nil { @@ -90,6 +90,32 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, metadataKey dis return metadata, nil } +// GetBulkBlobMetadata returns the metadata for the given blob keys +// Note: ordering of items is not guaranteed +func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) { + keys := make([]map[string]types.AttributeValue, len(blobKeys)) + for i := 0; i < len(blobKeys); i += 1 { + keys[i] = map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash}, + "MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash}, + } + } + items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) + if err != nil { + return nil, err + } + + metadata := make([]*disperser.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + // GetBlobMetadataByStatus returns all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. // It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index ab00338b31..ca80b5159e 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -63,6 +63,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, metadata2, fetchedMetadata) + fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2}) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchBulk[0]) + assert.Equal(t, metadata2, fetchBulk[1]) + processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) assert.NoError(t, err) assert.Len(t, processing, 1) diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 456818a64b..1f1eed25f8 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -251,6 +251,10 @@ func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey dispe return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey) } +func (s *SharedBlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) { + return s.blobMetadataStore.GetBulkBlobMetadata(ctx, blobKeys) +} + func (s *SharedBlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) { if metadata.NumRetries < maxRetry { if err := s.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil { diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 5142c3f7eb..e672599fe4 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -328,6 +328,18 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK return nil, disperser.ErrBlobNotFound } +func (q *BlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) { + q.mu.RLock() + defer q.mu.RUnlock() + metas := make([]*disperser.BlobMetadata, len(blobKeys)) + for i, key := range blobKeys { + if meta, ok := q.Metadata[key]; ok { + metas[i] = meta + } + } + return metas, nil +} + func (q *BlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) { if metadata.NumRetries < maxRetry { if err := q.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil { diff --git a/disperser/disperser.go b/disperser/disperser.go index f69f7bfc89..6ee63d1db4 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -180,6 +180,8 @@ type BlobStore interface { GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *BatchIndexExclusiveStartKey) ([]*BlobMetadata, *BatchIndexExclusiveStartKey, error) // GetBlobMetadata returns a blob metadata given a metadata key GetBlobMetadata(ctx context.Context, blobKey BlobKey) (*BlobMetadata, error) + // GetBulkBlobMetadata returns a list of blob metadata given a list of blob keys + GetBulkBlobMetadata(ctx context.Context, blobKeys []BlobKey) ([]*BlobMetadata, error) // HandleBlobFailure handles a blob failure by either incrementing the retry count or marking the blob as failed // Returns a boolean indicating whether the blob should be retried and an error HandleBlobFailure(ctx context.Context, metadata *BlobMetadata, maxRetry uint) (bool, error)