Skip to content

Commit

Permalink
Merge pull request #18322 from rjl493456442/reomit-log-events
Browse files Browse the repository at this point in the history
core: re-emit new log event when logs rebirth
  • Loading branch information
karalabe authored Apr 4, 2019
2 parents a8dd1f9 + 43631aa commit 7dd3194
Show file tree
Hide file tree
Showing 2 changed files with 271 additions and 42 deletions.
104 changes: 63 additions & 41 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,75 +1465,86 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
return 0, nil, nil, nil
}

// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions

deletedTxs types.Transactions
addedTxs types.Transactions

deletedLogs []*types.Log
rebirthLogs []*types.Log

// collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash.
// These logs are later announced as deleted.
collectLogs = func(hash common.Hash) {
// Coalesce logs and set 'Removed'.
// These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash, removed bool) {
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
del := *log
del.Removed = true
deletedLogs = append(deletedLogs, &del)
l := *log
if removed {
l.Removed = true
deletedLogs = append(deletedLogs, &l)
} else {
rebirthLogs = append(rebirthLogs, &l)
}
}
}
}
)

// first reduce whoever is higher bound
// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)

collectLogs(oldBlock.Hash())
collectLogs(oldBlock.Hash(), true)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
// New chain is longer, stash all blocks away for subsequent insertion
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
}
}
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
return fmt.Errorf("invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
return fmt.Errorf("invalid new chain")
}

// Both sides of the reorg are at the same number, reduce both until the common
// ancestor is found
for {
// If the common ancestor was found, bail out
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
break
}

// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
collectLogs(oldBlock.Hash(), true)

newChain = append(newChain, newBlock)

oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
// Step back with both chains
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
return fmt.Errorf("invalid old chain")
}
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
return fmt.Errorf("invalid new chain")
}
}
// Ensure the user sees large reorgs
Expand All @@ -1548,35 +1559,46 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
// Insert the new chain, taking care of the proper incremental order
var addedTxs types.Transactions
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
// Insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
// write lookup entries for hash based transaction/receipt searches

// Collect reborn logs due to chain reorg (except head block (reverse order))
if i != 0 {
collectLogs(newChain[i].Hash(), false)
}
// Write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
// calculate the difference between deleted and added transactions
diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
// When transactions get deleted from the database, the receipts that were
// created in the fork must also be deleted
batch := bc.db.NewBatch()
for _, tx := range diff {
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(batch, tx.Hash())
}
batch.Write()

if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
//
// TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct
// event ordering?
go func() {
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
if len(oldChain) > 0 {
for _, block := range oldChain {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}

}
}()
return nil
}

Expand Down
Loading

0 comments on commit 7dd3194

Please sign in to comment.