-
Notifications
You must be signed in to change notification settings - Fork 195
[Storing] Refactor Storing Collections #7736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: leo/add-block-view-index
Are you sure you want to change the base?
[Storing] Refactor Storing Collections #7736
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
if err != nil { | ||
// ignore collection if already seen | ||
if errors.Is(err, storage.ErrAlreadyExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StoreAndIndexByTransaction no longer return this error
return err | ||
} | ||
|
||
// now store each of the transaction body | ||
for _, tx := range collection.Transactions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StoreAndIndexByTransaction
will store the transaction internally, so that txs and collection can be saved in the same batch update.
// TODO(7355): lockctx | ||
indexingByTx *sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with the lock manager
// produce multiple finalized collections (aka guaranteed collections) containing the same | ||
// transaction repeadely. | ||
// TODO: For now we log a warning, but eventually we need to handle Byzantine clusters | ||
err = operation.RemoveTransaction(rw.Writer(), txID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced by transactions.RemoveBatch, so that the cache in transactions is also updated.
} | ||
} | ||
|
||
return nil | ||
// Store individual transactions | ||
for _, tx := range collection.Transactions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing transactions used to be done outside of BatchStoreAndIndexByTransaction
, because we want to reuse the light collection, the caused the logic to be duplicated.
Now this is refactored by moving storing txs inside of BatchStore, and the light
collection can be returned for outer logic to use without being computed again.
@@ -2153,6 +2154,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { | |||
notNil(builder.collections), | |||
notNil(builder.transactions), | |||
lastFullBlockHeight, | |||
storage.NewTestingLockManager(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage.NewTestingLockManager(), | |
node.StorageLockMgr, |
@@ -1451,6 +1451,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS | |||
builder.RootChainID.Chain(), | |||
indexerDerivedChainData, | |||
collectionExecutedMetric, | |||
storage.NewTestingLockManager(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again here using the testing-only lock manager in production code. We need to make sure this is a very hard mistake to make going forward. Given that it has come up already, I think we need to revisit the protections we put around misuse of the testing lock manager constructor
@@ -785,7 +786,13 @@ func (suite *Suite) TestGetSealedTransaction() { | |||
// 3. Request engine is used to request missing collection | |||
suite.request.On("EntityByID", collection.ID(), mock.Anything).Return() | |||
// 4. Indexer IndexCollection receives the requested collection and all the execution receipts | |||
err = indexer.IndexCollection(collection, collections, transactions, suite.log, collectionExecutedMetric) | |||
// Create a lock context for indexing | |||
indexLctx := storage.NewTestingLockManager().NewContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should re-use the lock manager already created in this test.
I think your idea of adding some utilities will help with this, like RunWithLockCtx()
. I'm also wondering making a version of RunWithBadgerDB
that instantiates the lock manager will help to avoid accidentally creating multiple lock managers within test cases.
@@ -990,7 +998,13 @@ func (suite *Suite) TestGetTransactionResult() { | |||
ingestEng.OnFinalizedBlock(mb) | |||
|
|||
// Indexer IndexCollection receives the requested collection and all the execution receipts | |||
err = indexer.IndexCollection(collection, collections, transactions, suite.log, collectionExecutedMetric) | |||
// Create a lock context for indexing | |||
indexLctx := storage.NewTestingLockManager().NewContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First lock manager for this test case created here: https://github.com/onflow/flow-go/pull/7736/files#diff-7b89d8add60bc70809a38bbadd3a6ab68d0982630735a24aba8597c7de8bc958R960. Should only have one per test
err := indexer.IndexCollection(collection, s.collections, s.transactions, s.logger, s.collectionExecutedMetric) | ||
// Create a lock context for indexing | ||
lctx := s.lockManager.NewContext() | ||
err := lctx.AcquireLock(storage.LockInsertCollection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AcquireLock
can return either ErrPolicyViolation
or UnknownLockError
. Both of these should be considered critical at this point, so we should consider this as an exception, not logging and continuing.
I see that this component isn't structured to throw irrecoverable errors, but I think it's better to log.Fatal()
and add a TODO noting that this component should be using irrecoverable.Context
.
// transaction IDs) and adds a transaction id index for each of the | ||
// transactions within the collection (transaction_id->collection_id). | ||
// | ||
// StoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// StoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID. | |
// StoreAndIndexByTransaction stores a light collection and indexes it by transaction ID. |
@@ -199,7 +207,7 @@ func (c *Collections) batchStoreLightAndIndexByTransaction(collection *flow.Ligh | |||
// already exists. | |||
// | |||
// No errors are expected during normal operation. | |||
func (c *Collections) StoreLightAndIndexByTransaction(collection *flow.LightCollection) error { | |||
func (c *Collections) StoreAndIndexByTransaction(lctx lockctx.Proof, collection *flow.Collection) (flow.LightCollection, error) { | |||
// - This lock is to ensure there is no race condition when indexing collection by transaction ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole block of comments is outdated (lines 212-222)
lockManager := storage.NewTestingLockManager() | ||
lctx := lockManager.NewContext() | ||
err := lctx.AcquireLock(storage.LockInsertCollection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unittest.LockManagerWithContext
saves us a few lines in these kinds of usages
func (c *Collections) StoreAndIndexByTransaction(_ lockctx.Proof, collection *flow.Collection) (flow.LightCollection, error) { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[no action required in this PR]
This component is pretty confusing now, concurrency-wise:
- it's in a package called
unsynchronized
- but it has and uses a mutex
- and it accepts a lock context (because we want to implement the
storage
interface for these components) - but the lock context is ignored
unsynchronized.Collections
is essentially a mempool -- it's not clear to me why we're having it implement the storage
interface. I took a look through the usages, and it seems like in all cases, we are referencing it by its concrete type rather than as an abstract implementation of the interface. My best guess is that kind of usage was the original reason for building it in this way. But given that we are only referencing it as a concrete type, maybe we should consider adopting the existing mempool for this purpose.
This doesn't really have anything to do with Pebble or this PR, so I'll add it as an item to #7682 and we can ask @peterargue when he's back.
// Create a no-op lock context for testing | ||
noOpLockCtx := &noOpLockContext{} | ||
_, err := collections.StoreAndIndexByTransaction(noOpLockCtx, &collection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Create a no-op lock context for testing | |
noOpLockCtx := &noOpLockContext{} | |
_, err := collections.StoreAndIndexByTransaction(noOpLockCtx, &collection) | |
_, err := collections.StoreAndIndexByTransaction(nil, &collection) // lock context should be ignored by non-DB storage backend |
Since we expect the implementation to completely ignore the argument, I'd pass in nil instead. That way if it isn't ignoring it, we'll notice.
This PR refactors storing collections to remove duplicated logic of storing transactions as well as refactor with lockctx manager.
See comments for the highlighted changes.