Skip to content

Commit

Permalink
refactor: remove base app from store (cosmos#14417)
Browse files Browse the repository at this point in the history
## Description

This pr removes base app from the store package. This is un order to decouple store from the sdk. There are a few things we can clean up but the telemetry package may be difficult with how it's done. 

---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/main/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/main/docs/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/main/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed 
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
  • Loading branch information
tac0turtle authored Dec 27, 2022
1 parent c1580c9 commit 1fab762
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#13881](https://github.com/cosmos/cosmos-sdk/pull/13881) Optimize iteration on nested cached KV stores and other operations in general.
* (x/gov) [#14347](https://github.com/cosmos/cosmos-sdk/pull/14347) Support `v1.Proposal` message in `v1beta1.Proposal.Content`.
* (x/gov) [#14390](https://github.com/cosmos/cosmos-sdk/pull/14390) Add title, proposer and summary to proposal struct
* (baseapp) [#14417](https://github.com/cosmos/cosmos-sdk/pull/14417) `SetStreamingService` accepts appOptions, AppCodec and Storekeys needed to set streamers.
* Store pacakge no longer has a dependency on baseapp.

### State Machine Breaking

Expand Down
2 changes: 1 addition & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type BaseApp struct { //nolint: maligned

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
abciListeners []storetypes.ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
25 changes: 19 additions & 6 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/codec/types"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store"
pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types"
"github.com/cosmos/cosmos-sdk/store/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
)
Expand Down Expand Up @@ -232,14 +235,24 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
func (app *BaseApp) SetStreamingService(
appOpts servertypes.AppOptions,
appCodec storetypes.Codec,
keys map[string]*storetypes.KVStoreKey) error {
streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, app.logger, keys)
if err != nil {
return err
}
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
for _, streamer := range streamers {
for key, lis := range streamer.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, streamer)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
return nil
}

// SetTxDecoder sets the TxDecoder if it wasn't provided in the BaseApp constructor.
Expand Down
5 changes: 2 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/std"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -262,8 +261,8 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, logger, keys); err != nil {
// register the streaming service with the BaseApp
if err := bApp.SetStreamingService(appOpts, appCodec, keys); err != nil {
logger.Error("failed to load state streaming", "err", err)
os.Exit(1)
}
Expand Down
4 changes: 1 addition & 3 deletions simapp/app_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar"
"github.com/cosmos/cosmos-sdk/types/module"
Expand Down Expand Up @@ -245,8 +244,7 @@ func NewSimApp(

app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...)

// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, logger, app.kvStoreKeys()); err != nil {
if err := app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()); err != nil {
logger.Error("failed to load state streaming", "err", err)
os.Exit(1)
}
Expand Down
5 changes: 2 additions & 3 deletions store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -195,9 +194,9 @@ func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg stri
}

func int642Bytes(i int64) []byte {
return sdk.Uint64ToBigEndian(uint64(i))
return types.Uint64ToBigEndian(uint64(i))
}

func bytes2Int64(buf []byte) int64 {
return int64(sdk.BigEndianToUint64(buf))
return int64(types.BigEndianToUint64(buf))
}
2 changes: 1 addition & 1 deletion store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/tendermint/tendermint/libs/math"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/internal/conv"
"github.com/cosmos/cosmos-sdk/store/cachekv/internal"
"github.com/cosmos/cosmos-sdk/store/internal/conv"
"github.com/cosmos/cosmos-sdk/store/internal/kv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
Expand Down
2 changes: 2 additions & 0 deletions store/internal/conv/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package conv provides internal functions for convertions and data manipulation
package conv
26 changes: 26 additions & 0 deletions store/internal/conv/string.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package conv

import (
"reflect"
"unsafe"
)

// UnsafeStrToBytes uses unsafe to convert string into byte array. Returned bytes
// must not be altered after this function is called as it will cause a segmentation fault.
func UnsafeStrToBytes(s string) []byte {
var buf []byte
sHdr := (*reflect.StringHeader)(unsafe.Pointer(&s))
bufHdr := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
bufHdr.Data = sHdr.Data
bufHdr.Cap = sHdr.Len
bufHdr.Len = sHdr.Len
return buf
}

// UnsafeBytesToStr is meant to make a zero allocation conversion
// from []byte -> string to speed up operations, it is not meant
// to be used generally, but for a specific pattern to delete keys
// from a map.
func UnsafeBytesToStr(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
54 changes: 54 additions & 0 deletions store/internal/conv/string_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package conv

import (
"runtime"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

func TestStringSuite(t *testing.T) {
suite.Run(t, new(StringSuite))
}

type StringSuite struct{ suite.Suite }

func unsafeConvertStr() []byte {
return UnsafeStrToBytes("abc")
}

func (s *StringSuite) TestUnsafeStrToBytes() {
// we convert in other function to trigger GC. We want to check that
// the underlying array in []bytes is accessible after GC will finish swapping.
for i := 0; i < 5; i++ {
b := unsafeConvertStr()
runtime.GC()
<-time.NewTimer(2 * time.Millisecond).C
b2 := append(b, 'd') //nolint:gocritic // append is fine here
s.Equal("abc", string(b))
s.Equal("abcd", string(b2))
}
}

func unsafeConvertBytes() string {
return UnsafeBytesToStr([]byte("abc"))
}

func (s *StringSuite) TestUnsafeBytesToStr() {
// we convert in other function to trigger GC. We want to check that
// the underlying array in []bytes is accessible after GC will finish swapping.
for i := 0; i < 5; i++ {
str := unsafeConvertBytes()
runtime.GC()
<-time.NewTimer(2 * time.Millisecond).C
s.Equal("abc", str)
}
}

func BenchmarkUnsafeStrToBytes(b *testing.B) {
for i := 0; i < b.N; i++ {
UnsafeStrToBytes(strconv.Itoa(i))
}
}
2 changes: 1 addition & 1 deletion store/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/types/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to
Expand Down
13 changes: 4 additions & 9 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"sync"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
Expand All @@ -18,7 +17,7 @@ import (
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error)
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (types.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewFileStreamingService(
keys []types.StoreKey,
marshaller types.Codec,
logger log.Logger,
) (baseapp.StreamingService, error) {
) (types.StreamingService, error) {
homePath := cast.ToString(opts.Get(flags.FlagHome))
filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix))
fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir))
Expand Down Expand Up @@ -118,18 +117,17 @@ func NewFileStreamingService(
// WaitGroup and quit channel used to synchronize with the streaming services
// and any error that occurs during the setup.
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec types.Codec,
logger log.Logger,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
) ([]types.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)

// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))
activeStreamers := make([]types.StreamingService, 0, len(streamers))

for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey
Expand Down Expand Up @@ -180,9 +178,6 @@ func LoadStreamingServices(
return nil, nil, err
}

// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)

// kick off the background streaming service loop
streamingService.Stream(wg)

Expand Down
7 changes: 2 additions & 5 deletions store/streaming/constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
Expand Down Expand Up @@ -49,10 +47,9 @@ func TestStreamingServiceConstructor(t *testing.T) {
}

func TestLoadStreamingServices(t *testing.T) {
db := dbm.NewMemDB()

encCdc := types.NewTestCodec()
keys := types.NewKVStoreKeys("mockKey1", "mockKey2")
bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil)

testCases := map[string]struct {
appOpts serverTypes.AppOptions
Expand All @@ -76,7 +73,7 @@ func TestLoadStreamingServices(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys)
activeStreamers, _, err := streaming.LoadStreamingServices(tc.appOpts, encCdc, log.NewNopLogger(), keys)
require.NoError(t, err)
require.Equal(t, tc.activeStreamersLen, len(activeStreamers))
})
Expand Down
2 changes: 1 addition & 1 deletion store/streaming/file/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# File Streaming Service

This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes
This pkg contains an implementation of the [StreamingService](../../types/streaming.go) that writes
the data stream out to files on the local filesystem. This process is performed synchronously with the message processing
of the state machine.

Expand Down
3 changes: 1 addition & 2 deletions store/streaming/file/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/store/types"
)

var _ baseapp.StreamingService = &StreamingService{}
var _ types.StreamingService = &StreamingService{}

// StreamingService is a concrete implementation of StreamingService that writes
// state changes out to files.
Expand Down
6 changes: 2 additions & 4 deletions baseapp/streaming.go → store/types/streaming.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package baseapp
package types

import (
"context"
"io"
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp.
Expand All @@ -29,7 +27,7 @@ type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
Listeners() map[StoreKey][]WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
Expand Down

0 comments on commit 1fab762

Please sign in to comment.