Skip to content

Commit

Permalink
Fixes for parallel TX and metrics (#272)
Browse files Browse the repository at this point in the history
* Fixes for parallel TX and metrics

* newline

* Update epoch.go

* bump

* fixo

* lint

* add dag metrics

* lint
  • Loading branch information
BrandonWeng authored and udpatil committed Nov 1, 2022
1 parent da7c070 commit 30207e7
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 12 deletions.
3 changes: 3 additions & 0 deletions app/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package app

import (
"context"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/sei-protocol/sei-chain/utils/metrics"
abci "github.com/tendermint/tendermint/abci/types"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -31,6 +33,7 @@ func (app *App) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abci.Re
}

func (app *App) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) abci.ResponseDeliverTx {
defer metrics.MeasureDeliverTxDuration(time.Now())
// tracectx, span := (*app.tracingInfo.Tracer).Start(app.tracingInfo.TracerContext, "DeliverTx")
// oldCtx := app.tracingInfo.TracerContext
// app.tracingInfo.TracerContext = tracectx
Expand Down
16 changes: 11 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/sei-protocol/sei-chain/utils/metrics"
"github.com/sei-protocol/sei-chain/utils/tracing"

dexmodule "github.com/sei-protocol/sei-chain/x/dex"
Expand Down Expand Up @@ -139,8 +140,6 @@ import (
wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"

"go.opentelemetry.io/otel"

"github.com/sei-protocol/sei-chain/utils/metrics"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -990,6 +989,7 @@ func (app *App) ProcessBlockSynchronous(ctx sdk.Context, txs [][]byte) []*abci.E
for _, tx := range txs {
txResults = append(txResults, app.DeliverTxWithResult(ctx, tx))
}
metrics.IncrTxProcessTypeCounter(metrics.SYNCHRONOUS)
return txResults
}

Expand Down Expand Up @@ -1029,6 +1029,7 @@ func (app *App) ProcessTxConcurrent(

// Deliver the transaction and store the result in the channel
resultChan <- ChannelResult{txIndex, app.DeliverTxWithResult(ctx, txBytes)}
metrics.IncrTxProcessTypeCounter(metrics.CONCURRENT)
}

func (app *App) ProcessBlockConcurrent(
Expand Down Expand Up @@ -1060,10 +1061,15 @@ func (app *App) ProcessBlockConcurrent(
)
}

// Waits for all the transactions to complete
waitGroup.Wait()
// Do not call waitGroup.Wait() synchronously as it blocks on channel reads
// until all the messages are read. This closes the channel once
// results are all read and prevent any further writes.
go func() {
waitGroup.Wait()
close(resultChan)
}()

// Gather Results and store it based on txIndex
// Gather Results and store it based on txIndex and read results from channel
// Concurrent results may be in different order than the original txIndex
txResultsMap := map[int]*abci.ExecTxResult{}
for result := range resultChan {
Expand Down
4 changes: 4 additions & 0 deletions app/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package app

import (
"fmt"
"time"

acltypes "github.com/cosmos/cosmos-sdk/types/accesscontrol"
mapset "github.com/deckarep/golang-set"
"github.com/sei-protocol/sei-chain/utils/metrics"
)

type DagNodeID int
Expand Down Expand Up @@ -133,6 +135,7 @@ func (dag *Dag) AddEdge(fromIndex DagNodeID, toIndex DagNodeID) *DagEdge {
//
// It will also register the new node with AccessOpsMap so that future nodes that amy be dependent on this one can properly identify the dependency.
func (dag *Dag) AddNodeBuildDependency(messageIndex int, txIndex int, accessOp acltypes.AccessOperation) {
defer metrics.MeasureBuildDagDuration(time.Now(), "AddNodeBuildDependency")
dagNode := dag.AddNode(messageIndex, txIndex, accessOp)
// update tx index map
dag.TxIndexMap[txIndex] = dagNode.NodeID
Expand Down Expand Up @@ -283,6 +286,7 @@ func (dag *Dag) BuildCompletionSignalMaps() (
completionSignalingMap map[int]MessageCompletionSignalMapping,
blockingSignalsMap map[int]MessageCompletionSignalMapping,
) {
defer metrics.MeasureBuildDagDuration(time.Now(), "BuildCompletionSignalMaps")
completionSignalingMap = make(map[int]MessageCompletionSignalMapping)
blockingSignalsMap = make(map[int]MessageCompletionSignalMapping)
// go through every node
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ require (
)

replace (
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.73
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.1.76
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.59
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+fscA+Pulbpo=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sei-protocol/sei-cosmos v0.1.73 h1:yFG32zV8BGCbsS/AQYUNc2ZFWWZ2lXv505Gkf6vauKE=
github.com/sei-protocol/sei-cosmos v0.1.73/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw=
github.com/sei-protocol/sei-cosmos v0.1.76 h1:y75Pn8hTc1SWndq/oMGzVu4mkF9ma6kVaA36AdFKxD8=
github.com/sei-protocol/sei-cosmos v0.1.76/go.mod h1:Oaj7toqHCkwEEb+sDIWxtfTkPZxOpMXBXDMvIIqUjpw=
github.com/sei-protocol/sei-tendermint v0.1.59 h1:POGL60PumMQHF4EzAHzvkGfDnodQJLHpl65LuiwSO/Y=
github.com/sei-protocol/sei-tendermint v0.1.59/go.mod h1:Olwbjyagrpoxj5DAUhHxMTWDVEfQ3FYdpypaJ3+6Hs8=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
Expand Down
6 changes: 3 additions & 3 deletions loadtest/contracts/deploy_ten_contracts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ echo
cd $seihome/loadtest/contracts/mars && cargo build && docker run --rm -v "$(pwd)":/code \
--mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
cosmwasm/rust-optimizer:0.12.5
cosmwasm/rust-optimizer:0.12.6

cd $seihome/loadtest/contracts/saturn && cargo build && docker run --rm -v "$(pwd)":/code \
--mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
cosmwasm/rust-optimizer:0.12.5
cosmwasm/rust-optimizer:0.12.6

cd $seihome/loadtest/contracts/venus && cargo build && docker run --rm -v "$(pwd)":/code \
--mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
cosmwasm/rust-optimizer:0.12.5
cosmwasm/rust-optimizer:0.12.6

# Deploy all contracts
echo "Deploying contracts..."
Expand Down
6 changes: 6 additions & 0 deletions utils/metrics/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package metrics

const (
CONCURRENT = "concurrent"
SYNCHRONOUS = "synchronous"
)
36 changes: 36 additions & 0 deletions utils/metrics/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,39 @@ func GaugeSeidVersionAndCommit(version string, commit string) {
[]metrics.Label{telemetry.NewLabel("seid_version", version), telemetry.NewLabel("commit", commit)},
)
}

// sei_tx_process_type_count
func IncrTxProcessTypeCounter(processType string) {
metrics.IncrCounterWithLabels(
[]string{"sei", "tx", "process", "type"},
1,
[]metrics.Label{telemetry.NewLabel("type", processType)},
)
}

// Measures the time taken to execute a sudo msg
// Metric Names:
//
// sei_deliver_tx_duration_miliseconds
// sei_deliver_tx_duration_miliseconds_count
// sei_deliver_tx_duration_miliseconds_sum
func MeasureDeliverTxDuration(start time.Time) {
metrics.MeasureSince(
[]string{"sei", "deliver", "tx", "milliseconds"},
start.UTC(),
)
}

// Measures the time taken to execute a sudo msg
// Metric Names:
//
// sei_dag_build_duration_miliseconds
// sei_dag_build_duration_miliseconds_count
// sei_dag_build_duration_miliseconds_sum
func MeasureBuildDagDuration(start time.Time, method string) {
metrics.MeasureSinceWithLabels(
[]string{"sei", "dag", "build", "milliseconds"},
start.UTC(),
[]metrics.Label{telemetry.NewLabel("method", method)},
)
}
2 changes: 1 addition & 1 deletion x/dex/keeper/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func (k Keeper) SetEpoch(ctx sdk.Context, epoch uint64) {
bz := make([]byte, 8)
binary.BigEndian.PutUint64(bz, epoch)
store.Set([]byte(EpochKey), bz)
ctx.Logger().Info(fmt.Sprintf("Current epoch %d", epoch))
}

func (k Keeper) IsNewEpoch(ctx sdk.Context) (bool, uint64) {
store := ctx.KVStore(k.storeKey)
b := store.Get([]byte(EpochKey))
lastEpoch := binary.BigEndian.Uint64(b)
currentEpoch := k.EpochKeeper.GetEpoch(ctx).CurrentEpoch
ctx.Logger().Info(fmt.Sprintf("Current epoch %d", currentEpoch))
return currentEpoch > lastEpoch, currentEpoch
}

0 comments on commit 30207e7

Please sign in to comment.