Skip to content

Bugfix mempool cache #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/storage/mempoolStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type (
ArrivalTimestamp int64
FeePerByte int64
TransactionByteSize uint32
BlockHeight uint32
}
MempoolMap map[int64]MempoolCacheObject
)
Expand Down Expand Up @@ -95,7 +96,7 @@ func (m *MempoolCacheStorage) RemoveItem(keys interface{}) error {
func (m *MempoolCacheStorage) GetSize() int64 {
var size int64
for _, memObj := range m.mempoolMap {
size += 8 * 3 // key + feePerByte + arrivalTimestamp
size += 8 * 3 // key + feePerByte + arrivalTimestamp + blockHeight
size += int64(memObj.TransactionByteSize)
}
return size
Expand Down
49 changes: 26 additions & 23 deletions core/service/blockMainService.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
var (
blocksmithIndex *int64
err error
mempoolMap storage.MempoolMap
)

if !coreUtil.IsGenesis(previousBlock.GetID(), block) {
Expand Down Expand Up @@ -455,6 +456,13 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
return err
}
var transactionIDs = make([]int64, len(block.GetTransactions()))
mempoolMap, err = bs.MempoolService.GetMempoolTransactions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

if err != nil {
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
return err
}
// apply transactions and remove them from mempool
for index, tx := range block.GetTransactions() {
// assign block id and block height to tx
Expand All @@ -463,35 +471,24 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
tx.TransactionIndex = uint32(index) + 1
transactionIDs[index] = tx.GetID()
// validate tx here
// check if is in mempool : if yes, undo unconfirmed
rows, err := bs.QueryExecutor.ExecuteSelect(bs.MempoolQuery.GetMempoolTransaction(), false, tx.ID)
if err != nil {
rows.Close()
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
return err
}
txType, err := bs.ActionTypeSwitcher.GetTransactionType(tx)
if err != nil {
rows.Close()
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
return err
}

if rows.Next() {
// check if is in mempool : if yes, undo unconfirmed
if _, ok := mempoolMap[tx.ID]; ok {
err = bs.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType)
if err != nil {
rows.Close()
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
return err
}
}
rows.Close()

if block.Height > 0 {
err = bs.TransactionCoreService.ValidateTransaction(txType, true)
if err != nil {
Expand Down Expand Up @@ -519,14 +516,7 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
return err
}
}
if block.Height != 0 {
if errRemoveMempool := bs.MempoolService.RemoveMempoolTransactions(block.GetTransactions()); errRemoveMempool != nil {
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
return errRemoveMempool
}
}

linkedCount, err := bs.PublishedReceiptService.ProcessPublishedReceipts(block)
if err != nil {
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
Expand Down Expand Up @@ -688,6 +678,7 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
if err != nil {
return result, err
}
defer rows.Close()
queryResult, err = bs.FeeVoteRevealVoteQuery.BuildModel(queryResult, rows)
if err != nil {
return result, err
Expand Down Expand Up @@ -732,7 +723,19 @@ func (bs *BlockService) PushBlock(previousBlock, block *model.Block, broadcast,
}
}
}

if !coreUtil.IsGenesis(previousBlock.GetID(), block) {
if errRemoveMempool := bs.MempoolService.RemoveMempoolTransactions(block.GetTransactions()); errRemoveMempool != nil {
if rollbackErr := bs.QueryExecutor.RollbackTx(); rollbackErr != nil {
bs.Logger.Error(rollbackErr.Error())
}
// reset mempool cache
initMempoolErr := bs.MempoolService.InitMempoolTransaction()
if initMempoolErr != nil {
bs.Logger.Errorf(initMempoolErr.Error())
}
return err
}
}
err = bs.QueryExecutor.CommitTx()
if err != nil { // commit automatically unlock executor and close tx
return err
Expand Down
35 changes: 28 additions & 7 deletions core/service/blockMainService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ var (
Version: 1,
TransactionIndex: 1,
}
mockTransactionExpired = &model.Transaction{
ID: 1,
BlockID: 1,
Height: 12,
SenderAccountAddress: "BCZ",
RecipientAccountAddress: "ZCB",
TransactionType: 1,
Fee: 10,
Timestamp: 1000,
TransactionHash: []byte{},
TransactionBodyLength: 8,
TransactionBodyBytes: mockSendMoneyTxBodyBytes,
Signature: []byte{1, 2, 3, 4, 5, 6, 7, 8},
Version: 1,
TransactionIndex: 1,
}

mockAccountBalance = &model.AccountBalance{
AccountAddress: mockTransaction.GetSenderAccountAddress(),
Expand Down Expand Up @@ -1029,6 +1045,10 @@ func (*mockMempoolServiceRemoveTransactionsSuccess) RemoveMempoolTransactions(me
return nil
}

func (*mockMempoolServiceRemoveTransactionsSuccess) GetMempoolTransactions() (storage.MempoolMap, error) {
return make(storage.MempoolMap), nil
}

func TestBlockService_PushBlock(t *testing.T) {
type fields struct {
Chaintype chaintype.ChainType
Expand Down Expand Up @@ -1695,6 +1715,11 @@ func (*mockMempoolServiceSelectFail) SelectTransactionsFromMempool(blockTimestam
return nil, errors.New("want error on select")
}

// mockMempoolServiceSelectFail
func (*mockMempoolServiceSelectFail) GetMempoolTransactions() (storage.MempoolMap, error) {
return make(storage.MempoolMap), nil
}

// mockMempoolServiceSelectSuccess
func (*mockMempoolServiceSelectWrongTransactionBytes) SelectTransactionsFromMempool(
blockTimestamp int64,
Expand Down Expand Up @@ -3939,19 +3964,15 @@ func (*mockNodeRegistrationServiceBlockPopSuccess) UpdateNextNodeAdmissionCache(
return nil
}

func (*mockMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup(
height uint32,
) ([]*model.MempoolTransaction, error) {
return make([]*model.MempoolTransaction, 0), nil
func (*mockMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error) {
return make([]*model.Transaction, 0), nil
}

func (*mockMempoolServiceBlockPopSuccess) BackupMempools(commonBlock *model.Block) error {
return nil
}

func (*mockMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup(
height uint32,
) ([]*model.MempoolTransaction, error) {
func (*mockMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error) {
return nil, errors.New("mockedError")
}

Expand Down
10 changes: 3 additions & 7 deletions core/service/blockSpineService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3229,15 +3229,11 @@ func (*mockSpineExecutorBlockPopGetLastBlockFail) ExecuteSelectRow(qStr string,
return db.QueryRow(qStr), nil
}

func (*mockSpineMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup(
height uint32,
) ([]*model.MempoolTransaction, error) {
return make([]*model.MempoolTransaction, 0), nil
func (*mockSpineMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error) {
return make([]*model.Transaction, 0), nil
}

func (*mockSpineMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup(
height uint32,
) ([]*model.MempoolTransaction, error) {
func (*mockSpineMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error) {
return nil, errors.New("mockSpineedError")
}

Expand Down
81 changes: 52 additions & 29 deletions core/service/mempoolCoreService.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type (
nodeSecretPhrase string,
) ([]*model.BatchReceipt, error)
DeleteExpiredMempoolTransactions() error
GetMempoolTransactionsWantToBackup(height uint32) ([]*model.MempoolTransaction, error)
GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error)
BackupMempools(commonBlock *model.Block) error
}

Expand Down Expand Up @@ -119,6 +119,11 @@ func (mps *MempoolService) InitMempoolTransaction() error {
err error
mempools []*model.MempoolTransaction
)
// clearing cache before initialize
err = mps.MempoolCacheStorage.ClearCache()
if err != nil {
return err
}
mpQuery := mps.MempoolQuery.GetMempoolTransactions()
rows, err := mps.QueryExecutor.ExecuteSelect(mpQuery, false)
if err != nil {
Expand All @@ -139,6 +144,7 @@ func (mps *MempoolService) InitMempoolTransaction() error {
ArrivalTimestamp: mempool.ArrivalTimestamp,
FeePerByte: mempool.FeePerByte,
TransactionByteSize: uint32(len(mempool.TransactionBytes)),
BlockHeight: mempool.BlockHeight,
})
if err != nil {
return err
Expand Down Expand Up @@ -233,6 +239,7 @@ func (mps *MempoolService) AddMempoolTransaction(tx *model.Transaction, txBytes
ArrivalTimestamp: time.Now().UTC().Unix(),
FeePerByte: mpTx.FeePerByte,
TransactionByteSize: uint32(len(txBytes)),
BlockHeight: mpTx.BlockHeight,
})
if err != nil {
return err
Expand Down Expand Up @@ -527,10 +534,11 @@ func sortFeePerByteThenTimestampThenID(memTxs []storage.MempoolCacheObject) {
// which is the mempool transaction has been hit expiration time
func (mps *MempoolService) DeleteExpiredMempoolTransactions() error {
var (
qStr string
expirationTime = time.Now().Add(-constant.MempoolExpiration).Unix()
err error
cachedTxs = make(storage.MempoolMap)
qStr string
expirationTime = time.Now().Add(-constant.MempoolExpiration).Unix()
err error
cachedTxs = make(storage.MempoolMap)
expiredMempoolIDs []int64
)
err = mps.MempoolCacheStorage.GetAllItems(cachedTxs)
if err != nil {
Expand Down Expand Up @@ -562,6 +570,7 @@ func (mps *MempoolService) DeleteExpiredMempoolTransactions() error {
}
return err
}
expiredMempoolIDs = append(expiredMempoolIDs, memObj.Tx.ID)
}

qStr = mps.MempoolQuery.DeleteExpiredMempoolTransactions(expirationTime)
Expand All @@ -572,38 +581,45 @@ func (mps *MempoolService) DeleteExpiredMempoolTransactions() error {
}
return err
}
err = mps.MempoolCacheStorage.RemoveItem(expiredMempoolIDs)
if err != nil {
initMempoolErr := mps.InitMempoolTransaction()
if initMempoolErr != nil {
mps.Logger.Warnf("BackupMempoolsErr - InitMempoolErr - %v", initMempoolErr)
}
}
err = mps.QueryExecutor.CommitTx()
if err != nil {
return err
}
return nil
}

func (mps *MempoolService) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.MempoolTransaction, error) {
func (mps *MempoolService) GetMempoolTransactionsWantToBackup(height uint32) ([]*model.Transaction, error) {
var (
mempools []*model.MempoolTransaction
rows *sql.Rows
err error
txs = make([]*model.Transaction, 0)
err error
)

rows, err = mps.QueryExecutor.ExecuteSelect(mps.MempoolQuery.GetMempoolTransactionsWantToByHeight(height), false)
mempoolMap, err := mps.GetMempoolTransactions()
if err != nil {
return nil, err
return txs, err
}
defer rows.Close()
mempools, err = mps.MempoolQuery.BuildModel(mempools, rows)
if err != nil {
return nil, err
for _, memObj := range mempoolMap {
if memObj.BlockHeight > height {
txs = append(txs, &memObj.Tx)
}
}

return mempools, nil
return txs, nil
}

func (mps *MempoolService) BackupMempools(commonBlock *model.Block) error {

var (
mempoolsBackupBytes *bytes.Buffer
mempoolsBackup []*model.MempoolTransaction
mempoolsBackup []*model.Transaction
mempoolsBackupIDs []int64
err error
)

Expand All @@ -619,20 +635,20 @@ func (mps *MempoolService) BackupMempools(commonBlock *model.Block) error {
}

mempoolsBackupBytes = bytes.NewBuffer([]byte{})
for _, mempool := range mempoolsBackup {
for _, mempoolTx := range mempoolsBackup {
var (
tx *model.Transaction
txType transaction.TypeAction
)
tx, err := mps.TransactionUtil.ParseTransactionBytes(mempool.GetTransactionBytes(), true)
txType, err = mps.ActionTypeSwitcher.GetTransactionType(mempoolTx)
if err != nil {
rollbackErr := mps.QueryExecutor.RollbackTx()
if rollbackErr != nil {
mps.Logger.Warnf("rollbackErr:BackupMempools - %v", rollbackErr)
}
return err
}
txType, err = mps.ActionTypeSwitcher.GetTransactionType(tx)

err = mps.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType)
if err != nil {
rollbackErr := mps.QueryExecutor.RollbackTx()
if rollbackErr != nil {
Expand All @@ -641,22 +657,22 @@ func (mps *MempoolService) BackupMempools(commonBlock *model.Block) error {
return err
}

err = mps.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType)
/*
mempoolsBackupBytes format is
[...{4}byteSize,{bytesSize}transactionBytes]
*/
mempoolByte, err := mps.TransactionUtil.GetTransactionBytes(mempoolTx, true)
if err != nil {
rollbackErr := mps.QueryExecutor.RollbackTx()
if rollbackErr != nil {
mps.Logger.Warnf("rollbackErr:BackupMempools - %v", rollbackErr)
}
return err
}

/*
mempoolsBackupBytes format is
[...{4}byteSize,{bytesSize}transactionBytes]
*/
sizeMempool := uint32(len(mempool.GetTransactionBytes()))
sizeMempool := uint32(len(mempoolByte))
mempoolsBackupBytes.Write(commonUtils.ConvertUint32ToBytes(sizeMempool))
mempoolsBackupBytes.Write(mempool.GetTransactionBytes())
mempoolsBackupBytes.Write(mempoolByte)
mempoolsBackupIDs = append(mempoolsBackupIDs, mempoolTx.GetID())
}

for _, dQuery := range derivedQueries {
Expand All @@ -670,6 +686,13 @@ func (mps *MempoolService) BackupMempools(commonBlock *model.Block) error {
return err
}
}
err = mps.MempoolCacheStorage.RemoveItem(mempoolsBackupIDs)
if err != nil {
initMempoolErr := mps.InitMempoolTransaction()
if initMempoolErr != nil {
mps.Logger.Warnf("BackupMempoolsErr - InitMempoolErr - %v", initMempoolErr)
}
}
err = mps.QueryExecutor.CommitTx()
if err != nil {
return err
Expand Down
Loading