Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: state listener observe writes at wrong time #13516

Merged
merged 30 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ac86866
fix: state listener observe writes at wrong time
yihuang Oct 12, 2022
cfc62ff
synchronous abci call, and format doc
yihuang Nov 27, 2022
faa1f62
fix comment
yihuang Nov 27, 2022
473a066
update file streamer readme and fix typos
yihuang Nov 27, 2022
3e78836
typo
yihuang Nov 27, 2022
94591f3
fix: state listener observe writes at wrong time
yihuang Oct 12, 2022
9d4b776
improve UX of file streamer, make it immediately usable after enabled
yihuang Nov 28, 2022
1ee67d8
Merge remote-tracking branch 'fork/fix-listener-main3' into fix-liste…
yihuang Nov 28, 2022
25ce3af
get homePage from opts
yihuang Nov 28, 2022
2ca7afc
Merge branch 'main' into fix-listener-main3
tac0turtle Nov 28, 2022
135aaaf
Merge branch 'main' into fix-listener-main3
yihuang Nov 29, 2022
eff8761
Merge remote-tracking branch 'origin/main' into fix-listener-main3
yihuang Nov 29, 2022
b168d6f
fix merge
yihuang Nov 29, 2022
d310a03
use fmt.Errorf
yihuang Nov 29, 2022
c752120
Update CHANGELOG.md
yihuang Nov 29, 2022
4a2f933
Update CHANGELOG.md
yihuang Nov 29, 2022
43ae889
Merge branch 'main' into fix-listener-main3
yihuang Nov 29, 2022
ce17fe6
Merge branch 'main' into fix-listener-main3
yihuang Nov 30, 2022
b7e5c4a
Merge branch 'main' into fix-listener-main3
tac0turtle Nov 30, 2022
33d5ca9
Merge branch 'main' into fix-listener-main3
tac0turtle Nov 30, 2022
f583c69
Merge branch 'main' into fix-listener-main3
yihuang Nov 30, 2022
932680b
fix unit test
yihuang Nov 30, 2022
4fa2fbb
type assertion -> type conversion
yihuang Dec 1, 2022
e312225
update changelog about error propogation
yihuang Dec 1, 2022
23e3f9b
Update CHANGELOG.md
yihuang Dec 1, 2022
c5dbfd5
Merge branch 'main' into fix-listener-main3
yihuang Dec 1, 2022
d14c4ae
Merge branch 'main' into fix-listener-main3
tac0turtle Dec 2, 2022
c5d5408
Merge branch 'main' into fix-listener-main3
tac0turtle Dec 2, 2022
20aa3f7
Merge branch 'main' into fix-listener-main3
tac0turtle Dec 2, 2022
3559e7d
Merge branch 'main' into fix-listener-main3
tac0turtle Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: state listener observe writes at wrong time
Closes: #13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync

synchronous abci call, and format doc

fix comment

update file streamer readme and fix typos

typo
  • Loading branch information
yihuang committed Nov 27, 2022
commit 94591f35eb7082f1ca0c4d96f63e6ec4c134fe22
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ extension interfaces. `module.Manager.Modules` is now of type `map[string]interf
`cosmossdk.io/core/appmodule.AppModule` API.
* (x/group) [#13876](https://github.com/cosmos/cosmos-sdk/pull/13876) Add `GetMinExecutionPeriod` method on DecisionPolicy interface.
* (x/auth)[#13780](https://github.com/cosmos/cosmos-sdk/pull/13780) Querying with `id` (type of int64) in `AccountAddressByID` grpc query now throws error, use account-id(type of uint64) instead.
* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Add method `ListenCommit` to `ABCIListener`, move `ListeningEnabled` `AddListener` methods to `CommitMultiStore`, remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces, remove listening apis from caching layer, should only listen to the `rootmulti.Store`, add three new options to file streaming service constructor.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

### CLI Breaking Changes

Expand Down Expand Up @@ -206,6 +207,7 @@ extension interfaces. `module.Manager.Modules` is now of type `map[string]interf
* (server) [#13778](https://github.com/cosmos/cosmos-sdk/pull/13778) Set Cosmos SDK default endpoints to localhost to avoid unknown exposure of endpoints.
* (x/auth) [#13877](https://github.com/cosmos/cosmos-sdk/pull/13877) Handle missing account numbers during `InitGenesis`.
* (x/gov) [#13918](https://github.com/cosmos/cosmos-sdk/pull/13918) Fix propagation of message errors when executing a proposal.
* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) fix state listener observe writes at wrong time.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

### Deprecated

Expand Down
1,670 changes: 1,639 additions & 31 deletions api/cosmos/base/store/v1beta1/listening.pulsar.go

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
panic(sdkerrors.Wrapf(err, "BeginBlock listening hook failed, height: %d", req.Header.Height))
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
}
}

Expand All @@ -227,7 +227,7 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
panic(sdkerrors.Wrapf(err, "EndBlock listening hook failed, height: %d", req.Height))
Fixed Show fixed Hide fixed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

Expand Down Expand Up @@ -330,7 +330,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
defer func() {
for _, streamingListener := range app.abciListeners {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't your fault @yihuang but app.abciListeners is both read-from and written-to without synchronization, which is a memory model violation. Reads occur throughout baseapp/abci.go and writes occur at baseapp/options.go SetStreamingService.

Copy link
Collaborator Author

@yihuang yihuang Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the abci methods are not called concurrently, the SetStreamingService is supposed to be called on initialization only. So I believe we don't need to be thread-safe here.

Basically the abciListeners is supposed to be initialized on startup, then read-only after.

if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
panic(sdkerrors.Wrap(err, "DeliverTx listening hook failed"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}
}()
Expand Down Expand Up @@ -364,7 +364,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
func (app *BaseApp) Commit() abci.ResponseCommit {
header := app.deliverState.ctx.BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

Expand All @@ -373,6 +373,19 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
// MultiStore (app.cms) so when Commit() is called is persists those values.
app.deliverState.ms.Write()
commitID := app.cms.Commit()

res := abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}

// call the hooks with the Commit message
for _, streamingListener := range app.abciListeners {
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, ListenCommit should take a native Go context.Context.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

panic(sdkerrors.Wrapf(err, "Commit listening hook failed, height: %d", header.Height))
}
}

app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
Expand Down Expand Up @@ -406,10 +419,7 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {

go app.snapshotManager.SnapshotIfApplicable(header.Height)

return abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}
return res
}

// halt attempts to gracefully shutdown the node via SIGINT and SIGTERM falling
Expand Down
6 changes: 5 additions & 1 deletion baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ import (
"github.com/cosmos/cosmos-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp.
// the error results are propagated to consensus state machine,
// if you don't want to affect consensus, handle the errors internally and always return `nil` in these APIs.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenCommit updates the steaming service with the latest Commit event
ListenCommit(ctx types.Context, res abci.ResponseCommit) error
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this require types.Context? I would think a clean context.Context would be cleaner here?

Copy link
Collaborator Author

@yihuang yihuang Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess it could work, you mean change all methods right?

}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
Expand Down
Loading