Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 23 additions & 68 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,28 +1355,28 @@ func (app *App) DeliverTxWithResult(ctx sdk.Context, tx []byte, typedTx sdk.Tx)
}
}

func (app *App) ProcessTxsSynchronousV2(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, absoluteTxIndices []int) []*abci.ExecTxResult {
func (app *App) ProcessTxsSynchronousV2(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) []*abci.ExecTxResult {
defer metrics.BlockProcessLatency(time.Now(), metrics.SYNCHRONOUS)

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, tx := range txs {
ctx = ctx.WithTxIndex(absoluteTxIndices[i])
ctx = ctx.WithTxIndex(i)
res := app.DeliverTxWithResult(ctx, tx, typedTxs[i])
txResults = append(txResults, res)
metrics.IncrTxProcessTypeCounter(metrics.SYNCHRONOUS)
}
return txResults
}

func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, absoluteTxIndices []int) []*abci.ExecTxResult {
func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) []*abci.ExecTxResult {
defer metrics.BlockProcessLatency(time.Now(), metrics.SYNCHRONOUS)

ms := ctx.MultiStore().CacheMultiStore()
defer ms.Write()
ctx = ctx.WithMultiStore(ms)
txResults := make([]*abci.ExecTxResult, len(txs))
for i, tx := range txs {
ctx = ctx.WithTxIndex(absoluteTxIndices[i])
ctx = ctx.WithTxIndex(i)
evmMsg := app.GetEVMMsg(typedTxs[i])
// If not an EVM tx, fall back to v2 processing
if evmMsg == nil {
Expand Down Expand Up @@ -1424,62 +1424,35 @@ func (app *App) CacheContext(ctx sdk.Context) (sdk.Context, sdk.CacheMultiStore)
return ctx.WithMultiStore(msCache), msCache
}

func (app *App) PartitionPrioritizedTxs(_ sdk.Context, txs [][]byte, typedTxs []sdk.Tx) (
prioritizedTxs, otherTxs [][]byte,
prioritizedTypedTxs, otherTypedTxs []sdk.Tx,
prioritizedIndices, otherIndices []int,
) {
for idx, tx := range txs {
if typedTxs[idx] == nil {
otherTxs = append(otherTxs, tx)
otherTypedTxs = append(otherTypedTxs, nil)
otherIndices = append(otherIndices, idx)
continue
}

if utils.IsTxPrioritized(typedTxs[idx]) {
prioritizedTxs = append(prioritizedTxs, tx)
prioritizedTypedTxs = append(prioritizedTypedTxs, typedTxs[idx])
prioritizedIndices = append(prioritizedIndices, idx)
} else {
otherTxs = append(otherTxs, tx)
otherTypedTxs = append(otherTypedTxs, typedTxs[idx])
otherIndices = append(otherIndices, idx)
}

}
return prioritizedTxs, otherTxs, prioritizedTypedTxs, otherTypedTxs, prioritizedIndices, otherIndices
}

// ExecuteTxsConcurrently calls the appropriate function for processing transacitons
func (app *App) ExecuteTxsConcurrently(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, absoluteTxIndices []int) ([]*abci.ExecTxResult, sdk.Context) {
func (app *App) ExecuteTxsConcurrently(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) {
// Giga only supports synchronous execution for now
if app.GigaExecutorEnabled && app.GigaOCCEnabled {
return app.ProcessTXsWithOCCGiga(ctx, txs, typedTxs, absoluteTxIndices)
return app.ProcessTXsWithOCCGiga(ctx, txs, typedTxs)
} else if app.GigaExecutorEnabled {
return app.ProcessTxsSynchronousGiga(ctx, txs, typedTxs, absoluteTxIndices), ctx
return app.ProcessTxsSynchronousGiga(ctx, txs, typedTxs), ctx
} else if !ctx.IsOCCEnabled() {
return app.ProcessTxsSynchronousV2(ctx, txs, typedTxs, absoluteTxIndices), ctx
return app.ProcessTxsSynchronousV2(ctx, txs, typedTxs), ctx
}

return app.ProcessTXsWithOCCV2(ctx, txs, typedTxs, absoluteTxIndices)
return app.ProcessTXsWithOCCV2(ctx, txs, typedTxs)
}

func (app *App) GetDeliverTxEntry(ctx sdk.Context, txIndex int, absoluateIndex int, bz []byte, tx sdk.Tx) (res *sdk.DeliverTxEntry) {
func (app *App) GetDeliverTxEntry(ctx sdk.Context, txIndex int, bz []byte, tx sdk.Tx) (res *sdk.DeliverTxEntry) {
res = &sdk.DeliverTxEntry{
Request: abci.RequestDeliverTxV2{Tx: bz},
SdkTx: tx,
Checksum: sha256.Sum256(bz),
AbsoluteIndex: absoluateIndex,
AbsoluteIndex: txIndex,
}
return
}

// ProcessTXsWithOCCV2 runs the transactions concurrently via OCC, using the V2 executor
func (app *App) ProcessTXsWithOCCV2(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, absoluteTxIndices []int) ([]*abci.ExecTxResult, sdk.Context) {
func (app *App) ProcessTXsWithOCCV2(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) {
entries := make([]*sdk.DeliverTxEntry, len(txs))
for txIndex, tx := range txs {
entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, absoluteTxIndices[txIndex], tx, typedTxs[txIndex])
entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex])
}

batchResult := app.DeliverTxBatch(ctx, sdk.DeliverTxBatchRequest{TxEntries: entries})
Expand Down Expand Up @@ -1534,14 +1507,14 @@ func (app *App) ProcessTXsWithOCCV2(ctx sdk.Context, txs [][]byte, typedTxs []sd
}

// ProcessTXsWithOCCGiga runs the transactions concurrently via OCC, using the Giga executor
func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx, absoluteTxIndices []int) ([]*abci.ExecTxResult, sdk.Context) {
func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) {
evmEntries := make([]*sdk.DeliverTxEntry, 0, len(txs))
v2Entries := make([]*sdk.DeliverTxEntry, 0, len(txs))
for txIndex, tx := range txs {
if app.GetEVMMsg(typedTxs[txIndex]) != nil {
evmEntries = append(evmEntries, app.GetDeliverTxEntry(ctx, txIndex, absoluteTxIndices[txIndex], tx, typedTxs[txIndex]))
evmEntries = append(evmEntries, app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex]))
} else {
v2Entries = append(v2Entries, app.GetDeliverTxEntry(ctx, txIndex, absoluteTxIndices[txIndex], tx, typedTxs[txIndex]))
v2Entries = append(v2Entries, app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex]))
}
}

Expand Down Expand Up @@ -1576,7 +1549,7 @@ func (app *App) ProcessTXsWithOCCGiga(ctx sdk.Context, txs [][]byte, typedTxs []
evmBatchResult = nil
v2Entries = make([]*sdk.DeliverTxEntry, len(txs))
for txIndex, tx := range txs {
v2Entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, absoluteTxIndices[txIndex], tx, typedTxs[txIndex])
v2Entries[txIndex] = app.GetDeliverTxEntry(ctx, txIndex, tx, typedTxs[txIndex])
}
} else {
// Commit EVM cache to main store before processing non-EVM txs.
Expand Down Expand Up @@ -1667,38 +1640,20 @@ func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequ
events = append(events, beginBlockResp.Events...)

evmTxs := make([]*evmtypes.MsgEVMTransaction, len(txs)) // nil for non-EVM txs
txResults = make([]*abci.ExecTxResult, len(txs))
typedTxs := app.DecodeTransactionsConcurrently(ctx, txs)

prioritizedTxs, otherTxs, prioritizedTypedTxs, otherTypedTxs, prioritizedIndices, otherIndices := app.PartitionPrioritizedTxs(ctx, txs, typedTxs)

// run the prioritized txs
prioritizedResults, ctx := app.ExecuteTxsConcurrently(ctx, prioritizedTxs, prioritizedTypedTxs, prioritizedIndices)
for relativePrioritizedIndex, originalIndex := range prioritizedIndices {
txResults[originalIndex] = prioritizedResults[relativePrioritizedIndex]
evmTxs[originalIndex] = app.GetEVMMsg(prioritizedTypedTxs[relativePrioritizedIndex])
for i := range txs {
evmTxs[i] = app.GetEVMMsg(typedTxs[i])
}

// Flush giga stores so WriteDeferredBalances (which uses the standard BankKeeper)
// can see balance changes made by the giga executor via GigaBankKeeper.
if app.GigaExecutorEnabled {
ctx.GigaMultiStore().WriteGiga()
}

// Finalize all Bank Module Transfers here so that events are included for prioritiezd txs
deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx)
events = append(events, deferredWriteEvents...)
// Execute all transactions
txResults, ctx = app.ExecuteTxsConcurrently(ctx, txs, typedTxs)

midBlockEvents := app.MidBlock(ctx, req.GetHeight())
events = append(events, midBlockEvents...)

otherResults, ctx := app.ExecuteTxsConcurrently(ctx, otherTxs, otherTypedTxs, otherIndices)
for relativeOtherIndex, originalIndex := range otherIndices {
txResults[originalIndex] = otherResults[relativeOtherIndex]
evmTxs[originalIndex] = app.GetEVMMsg(otherTypedTxs[relativeOtherIndex])
}

// Flush giga stores after second round (same reason as above)
// Flush giga stores so WriteDeferredBalances (which uses the standard BankKeeper)
// can see balance changes made by the giga executor via GigaBankKeeper.
if app.GigaExecutorEnabled {
ctx.GigaMultiStore().WriteGiga()
}
Expand Down
86 changes: 2 additions & 84 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/sei-protocol/sei-chain/sei-cosmos/crypto/keys/secp256k1"
sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types"
banktypes "github.com/sei-protocol/sei-chain/sei-cosmos/x/bank/types"
stakingtypes "github.com/sei-protocol/sei-chain/sei-cosmos/x/staking/types"
abci "github.com/sei-protocol/sei-chain/sei-tendermint/abci/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/proto/tendermint/types"
testkeeper "github.com/sei-protocol/sei-chain/testutil/keeper"
Expand Down Expand Up @@ -54,87 +53,6 @@ func TestEmptyBlockIdempotency(t *testing.T) {
}
}

func TestPartitionPrioritizedTxs(t *testing.T) {
tm := time.Now().UTC()
valPub := secp256k1.GenPrivKey().PubKey()

testWrapper := app.NewTestWrapper(t, tm, valPub, false)

account := sdk.AccAddress(valPub.Address()).String()
validator := sdk.ValAddress(valPub.Address()).String()

oracleMsg := &oracletypes.MsgAggregateExchangeRateVote{
ExchangeRates: "1.2uatom",
Feeder: account,
Validator: validator,
}

otherMsg := &stakingtypes.MsgDelegate{
DelegatorAddress: account,
ValidatorAddress: validator,
Amount: sdk.NewCoin("usei", sdk.NewInt(1)),
}

txEncoder := app.MakeEncodingConfig().TxConfig.TxEncoder()
oracleTxBuilder := app.MakeEncodingConfig().TxConfig.NewTxBuilder()
otherTxBuilder := app.MakeEncodingConfig().TxConfig.NewTxBuilder()
mixedTxBuilder := app.MakeEncodingConfig().TxConfig.NewTxBuilder()

err := oracleTxBuilder.SetMsgs(oracleMsg)
require.NoError(t, err)
oracleTx, err := txEncoder(oracleTxBuilder.GetTx())
require.NoError(t, err)

err = otherTxBuilder.SetMsgs(otherMsg)
require.NoError(t, err)
otherTx, err := txEncoder(otherTxBuilder.GetTx())
require.NoError(t, err)

// this should be treated as non-oracle vote
err = mixedTxBuilder.SetMsgs([]sdk.Msg{oracleMsg, otherMsg}...)
require.NoError(t, err)
mixedTx, err := txEncoder(mixedTxBuilder.GetTx())
require.NoError(t, err)

txs := [][]byte{
oracleTx,
otherTx,
mixedTx,
}
typedTxs := []sdk.Tx{
oracleTxBuilder.GetTx(),
otherTxBuilder.GetTx(),
mixedTxBuilder.GetTx(),
}

prioritizedTxs, otherTxs, prioritizedTypedTxs, otherTypedTxs, prioIdxs, otherIdxs := testWrapper.App.PartitionPrioritizedTxs(testWrapper.Ctx, txs, typedTxs)
require.Equal(t, [][]byte{oracleTx}, prioritizedTxs)
require.Equal(t, [][]byte{otherTx, mixedTx}, otherTxs)
require.Equal(t, []int{0}, prioIdxs)
require.Equal(t, []int{1, 2}, otherIdxs)
require.Equal(t, 1, len(prioritizedTypedTxs))
require.Equal(t, 2, len(otherTypedTxs))

diffOrderTxs := [][]byte{
otherTx,
oracleTx,
mixedTx,
}
differOrderTypedTxs := []sdk.Tx{
otherTxBuilder.GetTx(),
oracleTxBuilder.GetTx(),
mixedTxBuilder.GetTx(),
}

prioritizedTxs, otherTxs, prioritizedTypedTxs, otherTypedTxs, prioIdxs, otherIdxs = testWrapper.App.PartitionPrioritizedTxs(testWrapper.Ctx, diffOrderTxs, differOrderTypedTxs)
require.Equal(t, [][]byte{oracleTx}, prioritizedTxs)
require.Equal(t, [][]byte{otherTx, mixedTx}, otherTxs)
require.Equal(t, []int{1}, prioIdxs)
require.Equal(t, []int{0, 2}, otherIdxs)
require.Equal(t, 1, len(prioritizedTypedTxs))
require.Equal(t, 2, len(otherTypedTxs))
}

func TestProcessOracleAndOtherTxsSuccess(t *testing.T) {
tm := time.Now().UTC()
valPub := secp256k1.GenPrivKey().PubKey()
Expand Down Expand Up @@ -589,9 +507,9 @@ func TestGetDeliverTxEntry(t *testing.T) {
tx := emptyTxBuilder.GetTx()
bz, _ := txEncoder(tx)

require.NotNil(t, ap.GetDeliverTxEntry(ctx, 0, 0, bz, tx))
require.NotNil(t, ap.GetDeliverTxEntry(ctx, 0, bz, tx))

require.NotNil(t, ap.GetDeliverTxEntry(ctx, 0, 0, bz, nil))
require.NotNil(t, ap.GetDeliverTxEntry(ctx, 0, bz, nil))
}

func isSwaggerRouteAdded(router *mux.Router) bool {
Expand Down
Loading