Skip to content

[Storing] Refactor Storing Collections #7736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: leo/add-block-view-index
Choose a base branch
from

Conversation

zhangchiqing
Copy link
Member

@zhangchiqing zhangchiqing commented Aug 14, 2025

This PR refactors storing collections to remove duplicated logic of storing transactions as well as refactor with lockctx manager.

See comments for the highlighted changes.

@zhangchiqing zhangchiqing changed the base branch from master to leo/add-block-view-index August 15, 2025 16:44
if err != nil {
// ignore collection if already seen
if errors.Is(err, storage.ErrAlreadyExists) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoreAndIndexByTransaction no longer return this error

return err
}

// now store each of the transaction body
for _, tx := range collection.Transactions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoreAndIndexByTransaction will store the transaction internally, so that txs and collection can be saved in the same batch update.

@zhangchiqing zhangchiqing marked this pull request as ready for review August 18, 2025 20:31
@zhangchiqing zhangchiqing requested a review from a team as a code owner August 18, 2025 20:31
Comment on lines -19 to -20
// TODO(7355): lockctx
indexingByTx *sync.Mutex
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with the lock manager

// produce multiple finalized collections (aka guaranteed collections) containing the same
// transaction repeadely.
// TODO: For now we log a warning, but eventually we need to handle Byzantine clusters
err = operation.RemoveTransaction(rw.Writer(), txID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced by transactions.RemoveBatch, so that the cache in transactions is also updated.

}
}

return nil
// Store individual transactions
for _, tx := range collection.Transactions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing transactions used to be done outside of BatchStoreAndIndexByTransaction, because we want to reuse the light collection, the caused the logic to be duplicated.

Now this is refactored by moving storing txs inside of BatchStore, and the light collection can be returned for outer logic to use without being computed again.

@@ -2153,6 +2154,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
notNil(builder.collections),
notNil(builder.transactions),
lastFullBlockHeight,
storage.NewTestingLockManager(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage.NewTestingLockManager(),
node.StorageLockMgr,

⚠️ Yikes, we're using the testing-only lock manager in production code here! Maybe we need to give it a more scary name.

@@ -1451,6 +1451,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
builder.RootChainID.Chain(),
indexerDerivedChainData,
collectionExecutedMetric,
storage.NewTestingLockManager(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again here using the testing-only lock manager in production code. We need to make sure this is a very hard mistake to make going forward. Given that it has come up already, I think we need to revisit the protections we put around misuse of the testing lock manager constructor

@@ -785,7 +786,13 @@ func (suite *Suite) TestGetSealedTransaction() {
// 3. Request engine is used to request missing collection
suite.request.On("EntityByID", collection.ID(), mock.Anything).Return()
// 4. Indexer IndexCollection receives the requested collection and all the execution receipts
err = indexer.IndexCollection(collection, collections, transactions, suite.log, collectionExecutedMetric)
// Create a lock context for indexing
indexLctx := storage.NewTestingLockManager().NewContext()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should re-use the lock manager already created in this test.

I think your idea of adding some utilities will help with this, like RunWithLockCtx(). I'm also wondering making a version of RunWithBadgerDB that instantiates the lock manager will help to avoid accidentally creating multiple lock managers within test cases.

@@ -990,7 +998,13 @@ func (suite *Suite) TestGetTransactionResult() {
ingestEng.OnFinalizedBlock(mb)

// Indexer IndexCollection receives the requested collection and all the execution receipts
err = indexer.IndexCollection(collection, collections, transactions, suite.log, collectionExecutedMetric)
// Create a lock context for indexing
indexLctx := storage.NewTestingLockManager().NewContext()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First lock manager for this test case created here: https://github.com/onflow/flow-go/pull/7736/files#diff-7b89d8add60bc70809a38bbadd3a6ab68d0982630735a24aba8597c7de8bc958R960. Should only have one per test

err := indexer.IndexCollection(collection, s.collections, s.transactions, s.logger, s.collectionExecutedMetric)
// Create a lock context for indexing
lctx := s.lockManager.NewContext()
err := lctx.AcquireLock(storage.LockInsertCollection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AcquireLock can return either ErrPolicyViolation or UnknownLockError. Both of these should be considered critical at this point, so we should consider this as an exception, not logging and continuing.

I see that this component isn't structured to throw irrecoverable errors, but I think it's better to log.Fatal() and add a TODO noting that this component should be using irrecoverable.Context.

// transaction IDs) and adds a transaction id index for each of the
// transactions within the collection (transaction_id->collection_id).
//
// StoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// StoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID.
// StoreAndIndexByTransaction stores a light collection and indexes it by transaction ID.

@@ -199,7 +207,7 @@ func (c *Collections) batchStoreLightAndIndexByTransaction(collection *flow.Ligh
// already exists.
//
// No errors are expected during normal operation.
func (c *Collections) StoreLightAndIndexByTransaction(collection *flow.LightCollection) error {
func (c *Collections) StoreAndIndexByTransaction(lctx lockctx.Proof, collection *flow.Collection) (flow.LightCollection, error) {
// - This lock is to ensure there is no race condition when indexing collection by transaction ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole block of comments is outdated (lines 212-222)

Comment on lines +146 to +148
lockManager := storage.NewTestingLockManager()
lctx := lockManager.NewContext()
err := lctx.AcquireLock(storage.LockInsertCollection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unittest.LockManagerWithContext saves us a few lines in these kinds of usages

Comment on lines +110 to 112
func (c *Collections) StoreAndIndexByTransaction(_ lockctx.Proof, collection *flow.Collection) (flow.LightCollection, error) {
c.lock.Lock()
defer c.lock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[no action required in this PR]

This component is pretty confusing now, concurrency-wise:

  • it's in a package called unsynchronized
  • but it has and uses a mutex
  • and it accepts a lock context (because we want to implement the storage interface for these components)
  • but the lock context is ignored

unsynchronized.Collections is essentially a mempool -- it's not clear to me why we're having it implement the storage interface. I took a look through the usages, and it seems like in all cases, we are referencing it by its concrete type rather than as an abstract implementation of the interface. My best guess is that kind of usage was the original reason for building it in this way. But given that we are only referencing it as a concrete type, maybe we should consider adopting the existing mempool for this purpose.

This doesn't really have anything to do with Pebble or this PR, so I'll add it as an item to #7682 and we can ask @peterargue when he's back.

Comment on lines +57 to +59
// Create a no-op lock context for testing
noOpLockCtx := &noOpLockContext{}
_, err := collections.StoreAndIndexByTransaction(noOpLockCtx, &collection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Create a no-op lock context for testing
noOpLockCtx := &noOpLockContext{}
_, err := collections.StoreAndIndexByTransaction(noOpLockCtx, &collection)
_, err := collections.StoreAndIndexByTransaction(nil, &collection) // lock context should be ignored by non-DB storage backend

Since we expect the implementation to completely ignore the argument, I'd pass in nil instead. That way if it isn't ignoring it, we'll notice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants