Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADR 038: State Listening #8012

Merged
merged 10 commits into from
Feb 5, 2021
Prev Previous commit
Next Next commit
review updates part 6: update StoreKVPair to differentiate between Se…
…t and Deletes on nil byte values; some minor adjustments
  • Loading branch information
i-norden committed Feb 4, 2021
commit f82bbfecd3e7ed40f2246eba376872cd6a706739
41 changes: 23 additions & 18 deletions docs/architecture/adr-038-state-listening.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte
type WriteListener interface {
// if value is nil then it was deleted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to separate writing []byte{} and deleting.
It is technically possible to write an empty value, and it does remain in the iavl store (eg. via iterators)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'm not sure this is true - I've tried that and gotten an error.

// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
OnWrite(storeKey types.StoreKey, key []byte, value []byte)
// set bool indicates if it was a set; true: set, false: delete
OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte)
}
```

Expand All @@ -47,9 +48,10 @@ and determine the source of each KV pair.

```protobuf
message StoreKVPair {
optional string store_key = 1;
required bytes key = 2;
required bytes value = 3;
optional string store_key = 1; // the store key for the KVStore this pair originates from
required bool set = 2; // true indicates a set operation, false indicates a delete operation
required bytes key = 3;
required bytes value = 4;
}
```

Expand All @@ -58,24 +60,25 @@ message StoreKVPair {
// protobuf encoded StoreKVPairs to an underlying io.Writer
type StoreKVPairWriteListener struct {
writer io.Writer
marshaler codec.BinaryMarshaler
marshaller codec.BinaryMarshaler
}

// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryMarshaler
func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVPairWriteListener {
return &StoreKVPairWriteListener{
writer: w,
marshaler: m,
marshaller: m,
}
}

// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte) {
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) {
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Set = set
kvPair.Key = key
kvPair.Value = value
if by, err := wl.marshaler.MarshalBinaryLengthPrefixed(kvPair); err == nil {
if by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair); err == nil {
wl.writer.Write(by)
}
}
Expand Down Expand Up @@ -107,20 +110,20 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(key, value)
s.onWrite(true, key, value)
}

// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(key, nil)
s.onWrite(false, key, nil)
}

// onWrite writes a KVStore operation to all of the WriteListeners
func (s *Store) onWrite(key, value []byte) {
func (s *Store) onWrite(set bool, key, value []byte) {
for _, l := range s.listeners {
l.OnWrite(s.parentStoreKey, key, value)
l.OnWrite(s.parentStoreKey, set, key, value)
}
}
```
Expand Down Expand Up @@ -234,11 +237,11 @@ subsequent state changes are written out to this file until the next `BeginBlock
// FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file
type FileStreamingService struct {
listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp
srcChan <-chan []byte // the channel that all of the WriteListeners write their out to
srcChan <-chan []byte // the channel that all of the WriteListeners write their data out to
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
dstFile *os.File // the current write output file
marshaler codec.BinaryMarshaler // marshaler used for re-marshalling the ABCI messages to write them out to the destination files
marshaller codec.BinaryMarshaler // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
}
```
Expand Down Expand Up @@ -284,13 +287,12 @@ func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreK
return nil, err
}
return &FileStreamingService{
listeners: listener
listeners: listeners,
srcChan: listenChan,
filePrefix: filePrefix,
writeDir: writeDir,
marshaler: m,
marshaller: m,
stateCache: make([][]byte, 0),
cacheLock: new(sync.Mutex),
}, nil
}

Expand All @@ -300,6 +302,7 @@ func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.Write
}

func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
// NOTE: this could either be done synchronously or asynchronously
// create a new file with the req info according to naming schema
// write req to file
// write all state changes cached for this stage to file
Expand All @@ -309,6 +312,7 @@ func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.Requ
}

func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
// NOTE: this could either be done synchronously or asynchronously
// create a new file with the req info according to naming schema
// write req to file
// write all state changes cached for this stage to file
Expand All @@ -318,6 +322,7 @@ func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.Reques
}

func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) {
// NOTE: this could either be done synchronously or asynchronously
// create a new file with the req info according to naming schema
// NOTE: if the tx failed, handle accordingly
// write req to file
Expand Down Expand Up @@ -352,7 +357,7 @@ or an auxiliary streaming services can read from the files and serve the data ov

We will create a separate standalone process that reads and internally queues the state as it is written out to these files
and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type,
i-norden marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for clients or caching to a database? Feels like the cache process should just read the files and talk to the DB and skip grpc. Or we should use an actual message queue instead of files + grpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the case of downstream IPLD-ization in Postgres we would just consume the files directly as we know we want to consume everything- as quickly as possible- and will run that process in the same environment with direct access to the files. But if we want to be more selective about what we consume and consume it remotely than that's where this auxiliary streaming service comes into play, as I see it. @robert-zaremba may be able to back me up here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To actually answer your question: this would be useful primarily for clients but also potentially database caching in those cases where the caching wants to leverage this more selective interface and/or perform the caching remotely.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's for clients - eg data warehousing / ETL. We were discussing queues but didn't make a decision. Such a service will only need to expose a network port, without exposing filesystem.

whether a DeliverTx message failed or succeeded, etc.
whether a DeliverTx message failed or succeeded, etc. In addition to unary RPC endpoints this service will expose `stream` RPC endpoints for realtime subscriptions.

#### File pruning

Expand Down