Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent recheckTx (#52) #155

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,22 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain releveant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *BaseApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

tx, err := app.txDecoder(req.Tx)
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

tx, err := app.preCheckTx(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}
waits, signals := app.checkAccountWGs.Register(tx)

app.checkAccountWGs.Wait(waits)
defer app.checkAccountWGs.Done(signals)

gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
Expand All @@ -233,6 +238,22 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}

func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

reqCheckTx := &RequestCheckTxAsync{
txBytes: req.Tx,
recheck: req.Type == abci.CheckTxType_Recheck,
callback: callback,
prepare: waitGroup1(),
}
app.chCheckTx <- reqCheckTx

go app.prepareCheckTx(reqCheckTx)
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking.
Expand Down
88 changes: 0 additions & 88 deletions baseapp/accountlock.go

This file was deleted.

85 changes: 85 additions & 0 deletions baseapp/accountwgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package baseapp

import (
"sync"

sdk "github.com/line/lbm-sdk/v2/types"
)

type AccountWGs struct {
mtx sync.Mutex
wgs map[string]*sync.WaitGroup
}

func NewAccountWGs() *AccountWGs {
return &AccountWGs{
wgs: make(map[string]*sync.WaitGroup),
}
}

func (aw *AccountWGs) Register(tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) {
signers := getUniqSigners(tx)

aw.mtx.Lock()
defer aw.mtx.Unlock()
for _, signer := range signers {
if wg := aw.wgs[signer]; wg != nil {
waits = append(waits, wg)
}
sig := waitGroup1()
aw.wgs[signer] = sig
signals = append(signals, NewAccountWG(signer, sig))
}

return waits, signals
}

func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) {
for _, wait := range waits {
wait.Wait()
}
}

func (aw *AccountWGs) Done(signals []*AccountWG) {
aw.mtx.Lock()
defer aw.mtx.Unlock()

for _, signal := range signals {
signal.wg.Done()
if aw.wgs[signal.acc] == signal.wg {
delete(aw.wgs, signal.acc)
}
}
}

func getUniqSigners(tx sdk.Tx) []string {
seen := map[string]bool{}
var signers []string
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, string(addr))
seen[addr.String()] = true
}
}
}
return signers
}

type AccountWG struct {
acc string
wg *sync.WaitGroup
}

func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG {
return &AccountWG{
acc: acc,
wg: wg,
}
}

func waitGroup1() (wg *sync.WaitGroup) {
wg = &sync.WaitGroup{}
wg.Add(1)
return wg
}
60 changes: 22 additions & 38 deletions baseapp/accountlock_test.go → baseapp/accountwgs_test.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,66 @@
package baseapp

import (
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

ostproto "github.com/line/ostracon/proto/ostracon/types"

"github.com/line/lbm-sdk/v2/crypto/keys/secp256k1"
"github.com/line/lbm-sdk/v2/testutil/testdata"
sdk "github.com/line/lbm-sdk/v2/types"
)

func TestAccountLock(t *testing.T) {
func TestConvertByteSliceToString(t *testing.T) {
b := []byte{65, 66, 67, 0, 65, 66, 67}
s := string(b)
require.Equal(t, len(b), len(s))
require.Equal(t, uint8(0), s[3])
}

func TestRegister(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, ostproto.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)
waits, signals := app.checkAccountWGs.Register(tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
require.Equal(t, 0, len(waits))
require.Equal(t, 3, len(signals))

app.accountLock.Unlock(accKeys)

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
for _, signal := range signals {
require.Equal(t, app.checkAccountWGs.wgs[signal.acc], signal.wg)
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
func TestDontPanicWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
require.NotPanics(t, func() { app.checkAccountWGs.Wait(nil) })
require.NotPanics(t, func() { app.checkAccountWGs.Done(nil) })
}

func TestGetUniqSortedAddressKey(t *testing.T) {
func TestGetUniqSigners(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)
tx := newTestTx(privs)
signers := getUniqSigners(tx)

// length should be reduced because `duplicated` is removed
require.Less(t, len(accKeys), len(addrs))
require.Less(t, len(signers), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
for i, iv := range signers {
for j, jv := range signers {
if i != j {
require.True(t, iv != jv)
}
}
}

// should be sorted
require.True(t, sort.IsSorted(uint32Slice(accKeys)))
}

type AccountLockTestTx struct {
Expand Down Expand Up @@ -111,9 +101,3 @@ func newTestTx(privs []*secp256k1.PrivKey) sdk.Tx {
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading